diff --git a/samples/mastra-extended/app/src/app/api/chat/route.ts b/samples/mastra-extended/app/src/app/api/chat/route.ts index bb381bf3..f570ff83 100644 --- a/samples/mastra-extended/app/src/app/api/chat/route.ts +++ b/samples/mastra-extended/app/src/app/api/chat/route.ts @@ -14,10 +14,21 @@ import { setTimeout as sleep } from "node:timers/promises"; import { NextRequest, NextResponse } from "next/server"; import type { ChatStreamEvent } from "@/app/types"; +import { getRetryAfterMs, isRateLimitError } from "@/lib/ai"; import { ensureSchema } from "@/lib/db"; import { getMockReply } from "@/lib/mock-agent"; import { mastra } from "@/mastra"; +const STREAM_RETRY_MAX_ATTEMPTS = Number(process.env.CHAT_STREAM_RETRY_MAX_ATTEMPTS ?? 4); +const STREAM_RETRY_BASE_DELAY_MS = Number(process.env.CHAT_STREAM_RETRY_BASE_DELAY_MS ?? 2000); +const STREAM_RETRY_MAX_DELAY_MS = Number(process.env.CHAT_STREAM_RETRY_MAX_DELAY_MS ?? 30_000); + +class RateLimitedBeforeOutputError extends Error { + constructor(public readonly cause: unknown) { + super("Rate limited before any output was emitted"); + } +} + export const runtime = "nodejs"; // --------------------------------------------------------------------------- @@ -360,16 +371,53 @@ async function processStructuredStream(source: unknown, splitter: ReturnType, + emit: EmitEvent, +): Promise { + let lastError: unknown; + for (let attempt = 1; attempt <= STREAM_RETRY_MAX_ATTEMPTS; attempt += 1) { + try { + const reply = await callAgentStream(request.message, request.threadId); + return reply.streamKind === "text" + ? await processTextStream(reply.stream, splitter, emit) + : await processStructuredStream(reply.stream, splitter, emit); + } catch (error) { + lastError = error; + + const isPreOutputRateLimit = error instanceof RateLimitedBeforeOutputError; + const isThrownRateLimit = !isPreOutputRateLimit && isRateLimitError(error); + if (!isPreOutputRateLimit && !isThrownRateLimit) throw error; + if (attempt === STREAM_RETRY_MAX_ATTEMPTS) throw error; + + const causeForHeaders = isPreOutputRateLimit ? error.cause : error; + const retryAfterMs = getRetryAfterMs(causeForHeaders); + const backoffMs = Math.min(STREAM_RETRY_MAX_DELAY_MS, STREAM_RETRY_BASE_DELAY_MS * 2 ** (attempt - 1)); + const jitter = Math.floor(Math.random() * Math.min(500, backoffMs / 2)); + const delay = (retryAfterMs ?? backoffMs) + jitter; + console.warn(`[chat-stream] rate limited (attempt ${attempt}/${STREAM_RETRY_MAX_ATTEMPTS}), retrying in ${delay}ms`); + await sleep(delay); + } + } + throw lastError; +} + function handleStreamingRequest(request: ChatRequest) { const encoder = new TextEncoder(); @@ -386,11 +434,7 @@ function handleStreamingRequest(request: ChatRequest) { if (process.env.MOCK_AGENT === "true") { emittedOutput = await processTextStream(createMockStream(request.message), splitter, emit); } else { - const reply = await callAgentStream(request.message, request.threadId); - emittedOutput = - reply.streamKind === "text" - ? await processTextStream(reply.stream, splitter, emit) - : await processStructuredStream(reply.stream, splitter, emit); + emittedOutput = await runAgentStreamWithRetry(request, splitter, emit); } for (const part of splitter.flush()) { diff --git a/samples/mastra-extended/app/src/app/api/items/seed/route.ts b/samples/mastra-extended/app/src/app/api/items/seed/route.ts index 206bf0a7..c5e0ec5d 100644 --- a/samples/mastra-extended/app/src/app/api/items/seed/route.ts +++ b/samples/mastra-extended/app/src/app/api/items/seed/route.ts @@ -19,7 +19,7 @@ export async function POST() { await resetDemoState(); const runId = randomUUID(); - await createSeedRun(runId, 20); + await createSeedRun(runId, 10); const queue = getSyncQueue(); await queue.add( diff --git a/samples/mastra-extended/app/src/app/page.tsx b/samples/mastra-extended/app/src/app/page.tsx index 9faff88f..108d8b41 100644 --- a/samples/mastra-extended/app/src/app/page.tsx +++ b/samples/mastra-extended/app/src/app/page.tsx @@ -96,7 +96,7 @@ export default function Page() { Tasks and events

Background jobs classify incoming work and system activity.

- Click one button to generate 10 tasks and 10 events. The worker fans that out into per-item jobs, stores + Click one button to generate 5 tasks and 5 events. The worker fans that out into per-item jobs, stores the results in Postgres, and builds embeddings for semantic lookup.

diff --git a/samples/mastra-extended/app/src/lib/ai.ts b/samples/mastra-extended/app/src/lib/ai.ts index a57626d4..d9ceae1e 100644 --- a/samples/mastra-extended/app/src/lib/ai.ts +++ b/samples/mastra-extended/app/src/lib/ai.ts @@ -1,4 +1,5 @@ import { createHash } from "node:crypto"; +import { setTimeout as sleep } from "node:timers/promises"; import { embed } from "ai"; import { z } from "zod"; @@ -8,6 +9,81 @@ import { getMastraEmbeddingModel, hasChatAccess, hasEmbeddingAccess } from "@/li import { fallbackEvents, fallbackTasks } from "@/lib/seed-data"; import { mastra } from "@/mastra"; +// --------------------------------------------------------------------------- +// 429-aware retry +// +// LLM/embedding endpoints (especially Azure OpenAI) frequently throttle. +// We retry on 429 / "rate limit" / "Too Many Requests" with exponential +// backoff + jitter. If the server provides a `Retry-After` header we honor +// it; otherwise we double the delay each attempt. +// --------------------------------------------------------------------------- + +const RETRY_MAX_ATTEMPTS = Number(process.env.LLM_RETRY_MAX_ATTEMPTS ?? 6); +const RETRY_BASE_DELAY_MS = Number(process.env.LLM_RETRY_BASE_DELAY_MS ?? 1500); +const RETRY_MAX_DELAY_MS = Number(process.env.LLM_RETRY_MAX_DELAY_MS ?? 30_000); + +function getErrorStatus(error: unknown): number | null { + if (typeof error !== "object" || error === null) return null; + const obj = error as Record; + if (typeof obj.status === "number") return obj.status; + if (typeof obj.statusCode === "number") return obj.statusCode; + const response = obj.response as Record | undefined; + if (response && typeof response.status === "number") return response.status; + return null; +} + +export function getRetryAfterMs(error: unknown): number | null { + if (typeof error !== "object" || error === null) return null; + const obj = error as Record; + const headers = + (obj.responseHeaders as Record | undefined) ?? + ((obj.response as Record | undefined)?.headers as Record | undefined); + if (!headers) return null; + + // Azure sends retry-after-ms (precise, often <1s) alongside retry-after + // (RFC seconds, typically the next quota window). Prefer the precise one. + const rawMs = headers["retry-after-ms"] ?? headers["Retry-After-Ms"]; + if (typeof rawMs === "string" || typeof rawMs === "number") { + const ms = Number(String(rawMs).trim()); + if (Number.isFinite(ms)) return ms; + } + + const raw = headers["retry-after"] ?? headers["Retry-After"]; + if (typeof raw !== "string" && typeof raw !== "number") return null; + const value = String(raw).trim(); + const seconds = Number(value); + if (Number.isFinite(seconds)) return seconds * 1000; + const dateMs = Date.parse(value); + if (Number.isFinite(dateMs)) return Math.max(0, dateMs - Date.now()); + return null; +} + +export function isRateLimitError(error: unknown) { + if (getErrorStatus(error) === 429) return true; + const message = error instanceof Error ? error.message : String(error ?? ""); + return /rate.?limit|too many requests|429/i.test(message); +} + +async function withRateLimitRetry(label: string, run: () => Promise): Promise { + let lastError: unknown; + for (let attempt = 1; attempt <= RETRY_MAX_ATTEMPTS; attempt += 1) { + try { + return await run(); + } catch (error) { + lastError = error; + if (!isRateLimitError(error) || attempt === RETRY_MAX_ATTEMPTS) throw error; + + const retryAfterMs = getRetryAfterMs(error); + const backoffMs = Math.min(RETRY_MAX_DELAY_MS, RETRY_BASE_DELAY_MS * 2 ** (attempt - 1)); + const jitter = Math.floor(Math.random() * Math.min(500, backoffMs / 2)); + const delay = (retryAfterMs ?? backoffMs) + jitter; + console.warn(`[${label}] rate limited (attempt ${attempt}/${RETRY_MAX_ATTEMPTS}), retrying in ${delay}ms`); + await sleep(delay); + } + } + throw lastError; +} + // --------------------------------------------------------------------------- // Schemas // --------------------------------------------------------------------------- @@ -26,8 +102,8 @@ const eventSchema = z.object({ body: z.string().min(20).max(320), }); -const taskBatchSchema = z.object({ tasks: z.array(taskSchema).length(10) }); -const eventBatchSchema = z.object({ events: z.array(eventSchema).length(10) }); +const taskBatchSchema = z.object({ tasks: z.array(taskSchema).length(5) }); +const eventBatchSchema = z.object({ events: z.array(eventSchema).length(5) }); const classificationSchema = z.object({ category: z.string().min(2).max(40), @@ -83,7 +159,7 @@ async function runChat( const agent = mastra.getAgent("generatorAgent"); const prompt = `${systemPrompt}\n\n${userPrompt}`; - const result = await agent.generate(prompt, { maxSteps: 1 }); + const result = await withRateLimitRetry("chat", () => agent.generate(prompt, { maxSteps: 1 })); return parseResponse(result.text ?? ""); } @@ -119,7 +195,7 @@ export async function embedTextForSearch(text: string) { try { const model = getMastraEmbeddingModel(); - const result = await embed({ model, value: text }); + const result = await withRateLimitRetry("embed", () => embed({ model, value: text })); return result.embedding; } catch { return deterministicEmbedding(text); @@ -212,7 +288,7 @@ async function generateTasksWithLlm() { const payload = await runChatJson( "You generate realistic project-team task records. Return valid JSON only.", [ - "Generate exactly 10 task records for a software product team.", + "Generate exactly 5 task records for a software product team.", "These tasks should look like assigned work pulled from tools like Jira, Linear, and GitHub.", "Avoid fake enterprise jargon. Keep them concrete and easy to understand.", "Return this exact shape:", @@ -228,7 +304,7 @@ async function generateEventsWithLlm() { const payload = await runChatJson( "You generate realistic system event records. Return valid JSON only.", [ - "Generate exactly 10 event records for a software product team.", + "Generate exactly 5 event records for a software product team.", "These events should look like recent activity from tools like Datadog, Vercel, Sentry, Slack, GitHub Actions, Stripe, and PagerDuty.", "Avoid fake enterprise jargon. Keep them concrete and easy to understand.", "Return this exact shape:", @@ -242,10 +318,13 @@ async function generateEventsWithLlm() { export async function generateSeedItems(): Promise { if (useFastLocalData() || !hasLlmAccess() || process.env.MOCK_AGENT === "true") { - return [...fallbackTasks, ...fallbackEvents]; + return [...fallbackTasks.slice(0, 5), ...fallbackEvents.slice(0, 5)]; } - const [tasks, events] = await Promise.all([generateTasksWithLlm(), generateEventsWithLlm()]); + // Serialized — running these in parallel bursts two requests at once and + // immediately competes with the per-item classify jobs for the same quota. + const tasks = await generateTasksWithLlm(); + const events = await generateEventsWithLlm(); return [...toRawItems("task", tasks), ...toRawItems("event", events)]; } diff --git a/samples/mastra-extended/app/src/lib/items.ts b/samples/mastra-extended/app/src/lib/items.ts index 5b68eea6..3e84af7a 100644 --- a/samples/mastra-extended/app/src/lib/items.ts +++ b/samples/mastra-extended/app/src/lib/items.ts @@ -106,6 +106,18 @@ export async function createSeedRun(id: string, totalItems = 20) { ); } +export async function setSeedRunTotal(id: string, totalItems: number) { + const pool = getPool(); + await pool.query( + ` + UPDATE seed_runs + SET total_items = $2, updated_at = NOW() + WHERE id = $1 + `, + [id, totalItems], + ); +} + export async function startSeedRun(id: string, summary: string) { const pool = getPool(); await pool.query( diff --git a/samples/mastra-extended/app/src/lib/mock-agent.ts b/samples/mastra-extended/app/src/lib/mock-agent.ts index 0e8dba27..6f36b89b 100644 --- a/samples/mastra-extended/app/src/lib/mock-agent.ts +++ b/samples/mastra-extended/app/src/lib/mock-agent.ts @@ -8,7 +8,7 @@ export async function getMockReply(message: string) { return [ "No sample items are loaded yet.", "", - "Click `Generate sample items` to create 10 tasks and 10 events, then ask again.", + "Click `Generate sample items` to create 5 tasks and 5 events, then ask again.", ].join("\n"); } diff --git a/samples/mastra-extended/app/src/lib/queue.ts b/samples/mastra-extended/app/src/lib/queue.ts index 81011e10..d9745219 100644 --- a/samples/mastra-extended/app/src/lib/queue.ts +++ b/samples/mastra-extended/app/src/lib/queue.ts @@ -2,6 +2,10 @@ import IORedis from "ioredis"; import { Queue } from "bullmq"; export const QUEUE_NAME = process.env.QUEUE_NAME ?? "support-sync"; +// Hash-tag wrapping forces every BullMQ key for this queue (wait/active/events/…) +// onto the same Redis cluster slot, so multi-key Lua scripts don't trip CROSSSLOT +// on sharded backends like Azure Managed Redis. +export const QUEUE_PREFIX = `{${QUEUE_NAME}}`; declare global { // eslint-disable-next-line no-var @@ -29,6 +33,7 @@ export function getSyncQueue() { if (!global.syncQueue) { global.syncQueue = new Queue(QUEUE_NAME, { connection: getRedisConnection(), + prefix: QUEUE_PREFIX, }); } diff --git a/samples/mastra-extended/app/src/worker.ts b/samples/mastra-extended/app/src/worker.ts index abdf34cc..1161336d 100644 --- a/samples/mastra-extended/app/src/worker.ts +++ b/samples/mastra-extended/app/src/worker.ts @@ -8,10 +8,11 @@ import { getItemById, insertSeedItems, markItemProcessed, + setSeedRunTotal, startSeedRun, updateProcessedItem, } from "@/lib/items"; -import { QUEUE_NAME, getRedisConnection, getSyncQueue } from "@/lib/queue"; +import { QUEUE_NAME, QUEUE_PREFIX, getRedisConnection, getSyncQueue } from "@/lib/queue"; type SeedBatchJob = { runId: string; @@ -24,10 +25,13 @@ type ClassifyItemJob = { async function handleSeedBatch(job: Job) { const { runId } = job.data; - await startSeedRun(runId, "Generating 10 tasks and 10 events with the LLM"); + await startSeedRun(runId, "Generating 5 tasks and 5 events with the LLM"); const rawItems = await generateSeedItems(); const insertedItems = await insertSeedItems(runId, rawItems); + // The seed route hardcodes a placeholder total before generation runs. + // Reconcile here so progress / completion checks reflect actual count. + await setSeedRunTotal(runId, insertedItems.length); const queue = getSyncQueue(); for (const item of insertedItems) { @@ -39,10 +43,10 @@ async function handleSeedBatch(job: Job) { } satisfies ClassifyItemJob, { jobId: `classify:${runId}:${item.id}`, - attempts: 2, + attempts: Number(process.env.CLASSIFY_JOB_ATTEMPTS ?? 6), backoff: { type: "exponential", - delay: 1000, + delay: Number(process.env.CLASSIFY_JOB_BACKOFF_MS ?? 5000), }, removeOnComplete: 100, removeOnFail: 100, @@ -99,7 +103,17 @@ async function main() { }, { connection: getRedisConnection(), - concurrency: Number(process.env.WORKER_CONCURRENCY ?? 8), + concurrency: Number(process.env.WORKER_CONCURRENCY ?? 2), + prefix: QUEUE_PREFIX, + // Azure quota for `chat-default` is 6 req / 10s, shared with the chat + // agent. We cap the worker at 4 jobs / 10s, leaving 2 req / 10s for + // interactive chat. Each job makes 1 chat call (classify) + 1 embed + // call (embedding has its own quota). Concurrent chat use that bursts + // past the remaining 2 will 429 and recover via the in-call retry. + limiter: { + max: Number(process.env.WORKER_RATE_LIMIT_MAX ?? 4), + duration: Number(process.env.WORKER_RATE_LIMIT_DURATION_MS ?? 10_000), + }, }, );