Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 53 additions & 9 deletions samples/mastra-extended/app/src/app/api/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@ import { setTimeout as sleep } from "node:timers/promises";
import { NextRequest, NextResponse } from "next/server";

import type { ChatStreamEvent } from "@/app/types";
import { getRetryAfterMs, isRateLimitError } from "@/lib/ai";
import { ensureSchema } from "@/lib/db";
import { getMockReply } from "@/lib/mock-agent";
import { mastra } from "@/mastra";

const STREAM_RETRY_MAX_ATTEMPTS = Number(process.env.CHAT_STREAM_RETRY_MAX_ATTEMPTS ?? 4);
const STREAM_RETRY_BASE_DELAY_MS = Number(process.env.CHAT_STREAM_RETRY_BASE_DELAY_MS ?? 2000);
const STREAM_RETRY_MAX_DELAY_MS = Number(process.env.CHAT_STREAM_RETRY_MAX_DELAY_MS ?? 30_000);

class RateLimitedBeforeOutputError extends Error {
constructor(public readonly cause: unknown) {
super("Rate limited before any output was emitted");
}
}

export const runtime = "nodejs";

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -360,16 +371,53 @@ async function processStructuredStream(source: unknown, splitter: ReturnType<typ
}

if (type === "error") {
emit({
type: "error",
message: extractErrorMessage("error" in payload ? payload.error : payload),
});
const errorPayload = "error" in payload ? payload.error : payload;
// If a 429 arrives before any output has been emitted, throw so the
// outer retry loop can re-try the whole stream call. Otherwise just
// log: the agent has already produced useful output and the error
// chunk would only confuse the client view.
if (!emittedOutput && isRateLimitError(errorPayload)) {
throw new RateLimitedBeforeOutputError(errorPayload);
}
console.warn("Agent stream error chunk:", extractErrorMessage(errorPayload));
}
}

return emittedOutput;
}

async function runAgentStreamWithRetry(
request: ChatRequest,
splitter: ReturnType<typeof createThinkingTextSplitter>,
emit: EmitEvent,
): Promise<boolean> {
let lastError: unknown;
for (let attempt = 1; attempt <= STREAM_RETRY_MAX_ATTEMPTS; attempt += 1) {
try {
const reply = await callAgentStream(request.message, request.threadId);
return reply.streamKind === "text"
? await processTextStream(reply.stream, splitter, emit)
: await processStructuredStream(reply.stream, splitter, emit);
} catch (error) {
lastError = error;

const isPreOutputRateLimit = error instanceof RateLimitedBeforeOutputError;
const isThrownRateLimit = !isPreOutputRateLimit && isRateLimitError(error);
if (!isPreOutputRateLimit && !isThrownRateLimit) throw error;
if (attempt === STREAM_RETRY_MAX_ATTEMPTS) throw error;

const causeForHeaders = isPreOutputRateLimit ? error.cause : error;
const retryAfterMs = getRetryAfterMs(causeForHeaders);
const backoffMs = Math.min(STREAM_RETRY_MAX_DELAY_MS, STREAM_RETRY_BASE_DELAY_MS * 2 ** (attempt - 1));
const jitter = Math.floor(Math.random() * Math.min(500, backoffMs / 2));
const delay = (retryAfterMs ?? backoffMs) + jitter;
console.warn(`[chat-stream] rate limited (attempt ${attempt}/${STREAM_RETRY_MAX_ATTEMPTS}), retrying in ${delay}ms`);
await sleep(delay);
}
}
throw lastError;
}

function handleStreamingRequest(request: ChatRequest) {
const encoder = new TextEncoder();

Expand All @@ -386,11 +434,7 @@ function handleStreamingRequest(request: ChatRequest) {
if (process.env.MOCK_AGENT === "true") {
emittedOutput = await processTextStream(createMockStream(request.message), splitter, emit);
} else {
const reply = await callAgentStream(request.message, request.threadId);
emittedOutput =
reply.streamKind === "text"
? await processTextStream(reply.stream, splitter, emit)
: await processStructuredStream(reply.stream, splitter, emit);
emittedOutput = await runAgentStreamWithRetry(request, splitter, emit);
}

for (const part of splitter.flush()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export async function POST() {
await resetDemoState();

const runId = randomUUID();
await createSeedRun(runId, 20);
await createSeedRun(runId, 10);

const queue = getSyncQueue();
await queue.add(
Expand Down
2 changes: 1 addition & 1 deletion samples/mastra-extended/app/src/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export default function Page() {
<span className="eyebrow">Tasks and events</span>
<h2>Background jobs classify incoming work and system activity.</h2>
<p className="muted-text">
Click one button to generate 10 tasks and 10 events. The worker fans that out into per-item jobs, stores
Click one button to generate 5 tasks and 5 events. The worker fans that out into per-item jobs, stores
the results in Postgres, and builds embeddings for semantic lookup.
</p>
</div>
Expand Down
95 changes: 87 additions & 8 deletions samples/mastra-extended/app/src/lib/ai.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createHash } from "node:crypto";
import { setTimeout as sleep } from "node:timers/promises";

import { embed } from "ai";
import { z } from "zod";
Expand All @@ -8,6 +9,81 @@ import { getMastraEmbeddingModel, hasChatAccess, hasEmbeddingAccess } from "@/li
import { fallbackEvents, fallbackTasks } from "@/lib/seed-data";
import { mastra } from "@/mastra";

// ---------------------------------------------------------------------------
// 429-aware retry
//
// LLM/embedding endpoints (especially Azure OpenAI) frequently throttle.
// We retry on 429 / "rate limit" / "Too Many Requests" with exponential
// backoff + jitter. If the server provides a `Retry-After` header we honor
// it; otherwise we double the delay each attempt.
// ---------------------------------------------------------------------------

const RETRY_MAX_ATTEMPTS = Number(process.env.LLM_RETRY_MAX_ATTEMPTS ?? 6);
const RETRY_BASE_DELAY_MS = Number(process.env.LLM_RETRY_BASE_DELAY_MS ?? 1500);
const RETRY_MAX_DELAY_MS = Number(process.env.LLM_RETRY_MAX_DELAY_MS ?? 30_000);

function getErrorStatus(error: unknown): number | null {
if (typeof error !== "object" || error === null) return null;
const obj = error as Record<string, unknown>;
if (typeof obj.status === "number") return obj.status;
if (typeof obj.statusCode === "number") return obj.statusCode;
const response = obj.response as Record<string, unknown> | undefined;
if (response && typeof response.status === "number") return response.status;
return null;
}

export function getRetryAfterMs(error: unknown): number | null {
if (typeof error !== "object" || error === null) return null;
const obj = error as Record<string, unknown>;
const headers =
(obj.responseHeaders as Record<string, unknown> | undefined) ??
((obj.response as Record<string, unknown> | undefined)?.headers as Record<string, unknown> | undefined);
if (!headers) return null;

// Azure sends retry-after-ms (precise, often <1s) alongside retry-after
// (RFC seconds, typically the next quota window). Prefer the precise one.
const rawMs = headers["retry-after-ms"] ?? headers["Retry-After-Ms"];
if (typeof rawMs === "string" || typeof rawMs === "number") {
const ms = Number(String(rawMs).trim());
if (Number.isFinite(ms)) return ms;
}

const raw = headers["retry-after"] ?? headers["Retry-After"];
if (typeof raw !== "string" && typeof raw !== "number") return null;
const value = String(raw).trim();
const seconds = Number(value);
if (Number.isFinite(seconds)) return seconds * 1000;
const dateMs = Date.parse(value);
if (Number.isFinite(dateMs)) return Math.max(0, dateMs - Date.now());
return null;
}

export function isRateLimitError(error: unknown) {
if (getErrorStatus(error) === 429) return true;
const message = error instanceof Error ? error.message : String(error ?? "");
return /rate.?limit|too many requests|429/i.test(message);
}

async function withRateLimitRetry<T>(label: string, run: () => Promise<T>): Promise<T> {
let lastError: unknown;
for (let attempt = 1; attempt <= RETRY_MAX_ATTEMPTS; attempt += 1) {
try {
return await run();
} catch (error) {
lastError = error;
if (!isRateLimitError(error) || attempt === RETRY_MAX_ATTEMPTS) throw error;

const retryAfterMs = getRetryAfterMs(error);
const backoffMs = Math.min(RETRY_MAX_DELAY_MS, RETRY_BASE_DELAY_MS * 2 ** (attempt - 1));
const jitter = Math.floor(Math.random() * Math.min(500, backoffMs / 2));
const delay = (retryAfterMs ?? backoffMs) + jitter;
console.warn(`[${label}] rate limited (attempt ${attempt}/${RETRY_MAX_ATTEMPTS}), retrying in ${delay}ms`);
await sleep(delay);
}
}
throw lastError;
}

Comment thread
lionello marked this conversation as resolved.
// ---------------------------------------------------------------------------
// Schemas
// ---------------------------------------------------------------------------
Expand All @@ -26,8 +102,8 @@ const eventSchema = z.object({
body: z.string().min(20).max(320),
});

const taskBatchSchema = z.object({ tasks: z.array(taskSchema).length(10) });
const eventBatchSchema = z.object({ events: z.array(eventSchema).length(10) });
const taskBatchSchema = z.object({ tasks: z.array(taskSchema).length(5) });
const eventBatchSchema = z.object({ events: z.array(eventSchema).length(5) });

const classificationSchema = z.object({
category: z.string().min(2).max(40),
Expand Down Expand Up @@ -83,7 +159,7 @@ async function runChat(

const agent = mastra.getAgent("generatorAgent");
const prompt = `${systemPrompt}\n\n${userPrompt}`;
const result = await agent.generate(prompt, { maxSteps: 1 });
const result = await withRateLimitRetry("chat", () => agent.generate(prompt, { maxSteps: 1 }));

return parseResponse(result.text ?? "");
}
Expand Down Expand Up @@ -119,7 +195,7 @@ export async function embedTextForSearch(text: string) {

try {
const model = getMastraEmbeddingModel();
const result = await embed({ model, value: text });
const result = await withRateLimitRetry("embed", () => embed({ model, value: text }));
return result.embedding;
} catch {
return deterministicEmbedding(text);
Expand Down Expand Up @@ -212,7 +288,7 @@ async function generateTasksWithLlm() {
const payload = await runChatJson(
"You generate realistic project-team task records. Return valid JSON only.",
[
"Generate exactly 10 task records for a software product team.",
"Generate exactly 5 task records for a software product team.",
"These tasks should look like assigned work pulled from tools like Jira, Linear, and GitHub.",
"Avoid fake enterprise jargon. Keep them concrete and easy to understand.",
"Return this exact shape:",
Expand All @@ -228,7 +304,7 @@ async function generateEventsWithLlm() {
const payload = await runChatJson(
"You generate realistic system event records. Return valid JSON only.",
[
"Generate exactly 10 event records for a software product team.",
"Generate exactly 5 event records for a software product team.",
"These events should look like recent activity from tools like Datadog, Vercel, Sentry, Slack, GitHub Actions, Stripe, and PagerDuty.",
"Avoid fake enterprise jargon. Keep them concrete and easy to understand.",
"Return this exact shape:",
Expand All @@ -242,10 +318,13 @@ async function generateEventsWithLlm() {

export async function generateSeedItems(): Promise<RawItemSeed[]> {
if (useFastLocalData() || !hasLlmAccess() || process.env.MOCK_AGENT === "true") {
return [...fallbackTasks, ...fallbackEvents];
return [...fallbackTasks.slice(0, 5), ...fallbackEvents.slice(0, 5)];
}

const [tasks, events] = await Promise.all([generateTasksWithLlm(), generateEventsWithLlm()]);
// Serialized — running these in parallel bursts two requests at once and
// immediately competes with the per-item classify jobs for the same quota.
const tasks = await generateTasksWithLlm();
const events = await generateEventsWithLlm();
return [...toRawItems("task", tasks), ...toRawItems("event", events)];
}

Expand Down
12 changes: 12 additions & 0 deletions samples/mastra-extended/app/src/lib/items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ export async function createSeedRun(id: string, totalItems = 20) {
);
}

export async function setSeedRunTotal(id: string, totalItems: number) {
const pool = getPool();
await pool.query(
`
UPDATE seed_runs
SET total_items = $2, updated_at = NOW()
WHERE id = $1
`,
[id, totalItems],
);
}

export async function startSeedRun(id: string, summary: string) {
const pool = getPool();
await pool.query(
Expand Down
2 changes: 1 addition & 1 deletion samples/mastra-extended/app/src/lib/mock-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export async function getMockReply(message: string) {
return [
"No sample items are loaded yet.",
"",
"Click `Generate sample items` to create 10 tasks and 10 events, then ask again.",
"Click `Generate sample items` to create 5 tasks and 5 events, then ask again.",
].join("\n");
}

Expand Down
5 changes: 5 additions & 0 deletions samples/mastra-extended/app/src/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import IORedis from "ioredis";
import { Queue } from "bullmq";

export const QUEUE_NAME = process.env.QUEUE_NAME ?? "support-sync";
// Hash-tag wrapping forces every BullMQ key for this queue (wait/active/events/…)
// onto the same Redis cluster slot, so multi-key Lua scripts don't trip CROSSSLOT
// on sharded backends like Azure Managed Redis.
export const QUEUE_PREFIX = `{${QUEUE_NAME}}`;

declare global {
// eslint-disable-next-line no-var
Expand Down Expand Up @@ -29,6 +33,7 @@ export function getSyncQueue() {
if (!global.syncQueue) {
global.syncQueue = new Queue(QUEUE_NAME, {
connection: getRedisConnection(),
prefix: QUEUE_PREFIX,
});
}

Expand Down
24 changes: 19 additions & 5 deletions samples/mastra-extended/app/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import {
getItemById,
insertSeedItems,
markItemProcessed,
setSeedRunTotal,
startSeedRun,
updateProcessedItem,
} from "@/lib/items";
import { QUEUE_NAME, getRedisConnection, getSyncQueue } from "@/lib/queue";
import { QUEUE_NAME, QUEUE_PREFIX, getRedisConnection, getSyncQueue } from "@/lib/queue";

type SeedBatchJob = {
runId: string;
Expand All @@ -24,10 +25,13 @@ type ClassifyItemJob = {

async function handleSeedBatch(job: Job<SeedBatchJob>) {
const { runId } = job.data;
await startSeedRun(runId, "Generating 10 tasks and 10 events with the LLM");
await startSeedRun(runId, "Generating 5 tasks and 5 events with the LLM");

const rawItems = await generateSeedItems();
const insertedItems = await insertSeedItems(runId, rawItems);
// The seed route hardcodes a placeholder total before generation runs.
// Reconcile here so progress / completion checks reflect actual count.
await setSeedRunTotal(runId, insertedItems.length);
const queue = getSyncQueue();

for (const item of insertedItems) {
Expand All @@ -39,10 +43,10 @@ async function handleSeedBatch(job: Job<SeedBatchJob>) {
} satisfies ClassifyItemJob,
{
jobId: `classify:${runId}:${item.id}`,
attempts: 2,
attempts: Number(process.env.CLASSIFY_JOB_ATTEMPTS ?? 6),
backoff: {
type: "exponential",
delay: 1000,
delay: Number(process.env.CLASSIFY_JOB_BACKOFF_MS ?? 5000),
},
removeOnComplete: 100,
removeOnFail: 100,
Expand Down Expand Up @@ -99,7 +103,17 @@ async function main() {
},
{
connection: getRedisConnection(),
concurrency: Number(process.env.WORKER_CONCURRENCY ?? 8),
concurrency: Number(process.env.WORKER_CONCURRENCY ?? 2),
prefix: QUEUE_PREFIX,
// Azure quota for `chat-default` is 6 req / 10s, shared with the chat
// agent. We cap the worker at 4 jobs / 10s, leaving 2 req / 10s for
// interactive chat. Each job makes 1 chat call (classify) + 1 embed
// call (embedding has its own quota). Concurrent chat use that bursts
// past the remaining 2 will 429 and recover via the in-call retry.
limiter: {
max: Number(process.env.WORKER_RATE_LIMIT_MAX ?? 4),
duration: Number(process.env.WORKER_RATE_LIMIT_DURATION_MS ?? 10_000),
},
},
);

Expand Down
Loading