diff --git a/.changeset/happy-yaks-bet.md b/.changeset/happy-yaks-bet.md new file mode 100644 index 000000000..887dd511c --- /dev/null +++ b/.changeset/happy-yaks-bet.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents-plugin-phonic": patch +--- + +Update phonic plugin to reuse session for handoffs diff --git a/.changeset/plenty-baths-hug.md b/.changeset/plenty-baths-hug.md new file mode 100644 index 000000000..611510e3a --- /dev/null +++ b/.changeset/plenty-baths-hug.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +remove rt session say logic and add phonic logic for resetting ws conn diff --git a/.changeset/sharp-apples-appear.md b/.changeset/sharp-apples-appear.md new file mode 100644 index 000000000..0ff50ec96 --- /dev/null +++ b/.changeset/sharp-apples-appear.md @@ -0,0 +1,9 @@ +--- +"@livekit/agents": patch +"@livekit/agents-plugin-google": patch +"@livekit/agents-plugin-openai": patch +"@livekit/agents-plugin-phonic": patch +--- + +- Make reusable Realtime Session across Handoffs & Agent Tasks +- Add say() capability to phonic realtime model diff --git a/agents/src/llm/index.ts b/agents/src/llm/index.ts index a8b157293..6d9bad4f4 100644 --- a/agents/src/llm/index.ts +++ b/agents/src/llm/index.ts @@ -68,6 +68,10 @@ export { oaiParams, serializeImage, toJsonSchema, + validateChatContextStructure, + type ChatContextValidationIssue, + type ChatContextValidationResult, + type ChatContextValidationSeverity, type FormatChatHistoryOptions, type OpenAIFunctionParameters, type SerializedImage, diff --git a/agents/src/llm/realtime.ts b/agents/src/llm/realtime.ts index 864e25d2d..abfef0d46 100644 --- a/agents/src/llm/realtime.ts +++ b/agents/src/llm/realtime.ts @@ -4,7 +4,7 @@ import type { AudioFrame } from '@livekit/rtc-node'; import { EventEmitter } from 'events'; import type { ReadableStream } from 'node:stream/web'; -import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { MultiInputStream } from '../stream/multi_input_stream.js'; import { Task } from '../utils.js'; import type { TimedString } from '../voice/io.js'; import type { ChatContext, FunctionCall } from './chat_context.js'; @@ -49,6 +49,10 @@ export interface RealtimeCapabilities { autoToolReplyGeneration: boolean; audioOutput: boolean; manualFunctionCalls: boolean; + midSessionChatCtxUpdate?: boolean; + midSessionInstructionsUpdate?: boolean; + midSessionToolsUpdate?: boolean; + perResponseToolChoice?: boolean; } export interface InputTranscriptionCompleted { @@ -84,7 +88,8 @@ export abstract class RealtimeModel { export abstract class RealtimeSession extends EventEmitter { protected _realtimeModel: RealtimeModel; - private deferredInputStream = new DeferredReadableStream(); + private inputAudioStream = new MultiInputStream(); + private inputAudioStreamId?: string; private _mainTask: Task; constructor(realtimeModel: RealtimeModel) { @@ -146,6 +151,7 @@ export abstract class RealtimeSession extends EventEmitter { async close(): Promise { this._mainTask.cancel(); + await this.inputAudioStream.close(); } /** @@ -156,7 +162,7 @@ export abstract class RealtimeSession extends EventEmitter { } private async _mainTaskImpl(signal: AbortSignal): Promise { - const reader = this.deferredInputStream.stream.getReader(); + const reader = this.inputAudioStream.stream.getReader(); while (true) { const { done, value } = await reader.read(); if (done || signal.aborted) { @@ -167,6 +173,9 @@ export abstract class RealtimeSession extends EventEmitter { } setInputAudioStream(audioStream: ReadableStream): void { - this.deferredInputStream.setSource(audioStream); + if (this.inputAudioStreamId !== undefined) { + void this.inputAudioStream.removeInputStream(this.inputAudioStreamId); + } + this.inputAudioStreamId = this.inputAudioStream.addInputStream(audioStream); } } diff --git a/agents/src/llm/utils.test.ts b/agents/src/llm/utils.test.ts index 68a6c2b61..964b520a7 100644 --- a/agents/src/llm/utils.test.ts +++ b/agents/src/llm/utils.test.ts @@ -12,7 +12,12 @@ import { FunctionCallOutput, type ImageContent, } from './chat_context.js'; -import { computeChatCtxDiff, formatChatHistory, serializeImage } from './utils.js'; +import { + computeChatCtxDiff, + formatChatHistory, + serializeImage, + validateChatContextStructure, +} from './utils.js'; function createChatMessage( id: string, @@ -457,6 +462,87 @@ describe('formatChatHistory', () => { }); }); +describe('validateChatContextStructure', () => { + it('returns valid=true for well-formed chat context', () => { + const ctx = new ChatContext([ + ChatMessage.create({ + id: 'msg_user', + role: 'user', + content: ['hello'], + createdAt: 1, + }), + FunctionCall.create({ + id: 'fn_call', + callId: 'call_1', + name: 'lookup_order', + args: '{"orderId":"123"}', + createdAt: 2, + }), + FunctionCallOutput.create({ + id: 'fn_output', + callId: 'call_1', + name: 'lookup_order', + output: '{"ok":true}', + isError: false, + createdAt: 3, + }), + ]); + + const result = validateChatContextStructure(ctx); + expect(result.valid).toBe(true); + expect(result.errors).toBe(0); + expect(result.warnings).toBe(0); + expect(result.issues).toEqual([]); + }); + + it('detects duplicate ids and timestamp ordering issues', () => { + const m1 = ChatMessage.create({ + id: 'dup_id', + role: 'user', + content: ['hello'], + createdAt: 10, + }); + const m2 = ChatMessage.create({ + id: 'dup_id', + role: 'assistant', + content: ['world'], + createdAt: 5, + }); + const ctx = new ChatContext([m1, m2]); + + const result = validateChatContextStructure(ctx); + expect(result.valid).toBe(false); + expect(result.errors).toBeGreaterThanOrEqual(2); + expect(result.issues.some((i) => i.code === 'duplicate_id')).toBe(true); + expect(result.issues.some((i) => i.code === 'timestamp_order')).toBe(true); + }); + + it('detects malformed terms and orphan function outputs', () => { + const msg = ChatMessage.create({ + id: 'msg_1', + role: 'user', + content: [' '], + createdAt: 1, + }); + const output = FunctionCallOutput.create({ + id: 'fn_out_1', + callId: 'call_missing', + name: 'lookup_order', + output: 'ok', + isError: false, + createdAt: 2, + }); + const ctx = new ChatContext([msg, output]); + + const result = validateChatContextStructure(ctx); + expect(result.valid).toBe(true); + expect(result.errors).toBe(0); + expect(result.warnings).toBeGreaterThanOrEqual(2); + expect(result.issues.some((i) => i.code === 'empty_text_term')).toBe(true); + expect(result.issues.some((i) => i.code === 'orphan_function_call_output')).toBe(true); + }); +}); + describe('serializeImage', () => { let consoleWarnSpy: ReturnType; diff --git a/agents/src/llm/utils.ts b/agents/src/llm/utils.ts index 76975d150..8865d1056 100644 --- a/agents/src/llm/utils.ts +++ b/agents/src/llm/utils.ts @@ -247,6 +247,33 @@ export interface FormatChatHistoryOptions { includeTimestamps?: boolean; } +export type ChatContextValidationSeverity = 'error' | 'warning'; + +export interface ChatContextValidationIssue { + severity: ChatContextValidationSeverity; + code: + | 'duplicate_id' + | 'timestamp_order' + | 'empty_message_content' + | 'empty_text_term' + | 'missing_image_term' + | 'invalid_audio_term' + | 'invalid_function_call' + | 'invalid_function_call_args' + | 'invalid_function_call_output' + | 'orphan_function_call_output'; + index: number; + itemId: string; + message: string; +} + +export interface ChatContextValidationResult { + valid: boolean; + errors: number; + warnings: number; + issues: ChatContextValidationIssue[]; +} + /** * Render a chat context into a readable multiline string for debugging and logging. */ @@ -273,6 +300,148 @@ export function formatChatHistory( ].join('\n'); } +/** + * Validate structural integrity of chat context items/terms for realtime usage. + */ +export function validateChatContextStructure(chatCtx: ChatContext): ChatContextValidationResult { + const issues: ChatContextValidationIssue[] = []; + const ids = new Set(); + const seenFunctionCallIds = new Set(); + let previousCreatedAt = -Infinity; + + const pushIssue = (issue: ChatContextValidationIssue) => { + issues.push(issue); + }; + + for (let index = 0; index < chatCtx.items.length; index += 1) { + const item = chatCtx.items[index]!; + + if (ids.has(item.id)) { + pushIssue({ + severity: 'error', + code: 'duplicate_id', + index, + itemId: item.id, + message: `Duplicate item id '${item.id}'`, + }); + } else { + ids.add(item.id); + } + + if (item.createdAt < previousCreatedAt) { + pushIssue({ + severity: 'error', + code: 'timestamp_order', + index, + itemId: item.id, + message: `Item createdAt (${item.createdAt}) is older than previous item (${previousCreatedAt})`, + }); + } + previousCreatedAt = item.createdAt; + + if (item.type === 'message') { + if (item.content.length === 0) { + pushIssue({ + severity: 'warning', + code: 'empty_message_content', + index, + itemId: item.id, + message: 'Message has empty content array', + }); + } + + item.content.forEach((term, termIndex) => { + if (typeof term === 'string') { + if (term.trim().length === 0) { + pushIssue({ + severity: 'warning', + code: 'empty_text_term', + index, + itemId: item.id, + message: `Message term[${termIndex}] is empty text`, + }); + } + return; + } + + if (term.type === 'image_content') { + if (!term.id || term.image === undefined || term.image === null) { + pushIssue({ + severity: 'error', + code: 'missing_image_term', + index, + itemId: item.id, + message: `Message term[${termIndex}] has invalid image content`, + }); + } + return; + } + + if (!Array.isArray(term.frame)) { + pushIssue({ + severity: 'error', + code: 'invalid_audio_term', + index, + itemId: item.id, + message: `Message term[${termIndex}] has invalid audio content`, + }); + } + }); + } else if (item.type === 'function_call') { + if (!item.name || !item.callId) { + pushIssue({ + severity: 'error', + code: 'invalid_function_call', + index, + itemId: item.id, + message: 'Function call is missing name or callId', + }); + } else { + seenFunctionCallIds.add(item.callId); + } + + try { + JSON.parse(item.args); + } catch { + pushIssue({ + severity: 'warning', + code: 'invalid_function_call_args', + index, + itemId: item.id, + message: 'Function call args are not valid JSON', + }); + } + } else if (item.type === 'function_call_output') { + if (!item.callId) { + pushIssue({ + severity: 'error', + code: 'invalid_function_call_output', + index, + itemId: item.id, + message: 'Function call output is missing callId', + }); + } else if (!seenFunctionCallIds.has(item.callId)) { + pushIssue({ + severity: 'warning', + code: 'orphan_function_call_output', + index, + itemId: item.id, + message: `Function call output references unknown callId '${item.callId}'`, + }); + } + } + } + + const errors = issues.filter((issue) => issue.severity === 'error').length; + const warnings = issues.length - errors; + return { + valid: errors === 0, + errors, + warnings, + issues, + }; +} + function formatChatHistoryItem( item: ChatItem, index: number, diff --git a/agents/src/stream/deferred_stream.ts b/agents/src/stream/deferred_stream.ts index 47ef0c2b1..6ca9baa48 100644 --- a/agents/src/stream/deferred_stream.ts +++ b/agents/src/stream/deferred_stream.ts @@ -41,6 +41,7 @@ export class DeferredReadableStream { private transform: IdentityTransform; private writer: WritableStreamDefaultWriter; private sourceReader?: ReadableStreamDefaultReader; + private sourceAttached = false; constructor() { this.transform = new IdentityTransform(); @@ -59,10 +60,11 @@ export class DeferredReadableStream { * Call once the actual source is ready. */ setSource(source: ReadableStream) { - if (this.isSourceSet) { + if (this.sourceAttached) { throw new Error('Stream source already set'); } + this.sourceAttached = true; const sourceReader = source.getReader(); this.sourceReader = sourceReader; void this.pump(sourceReader); @@ -124,7 +126,7 @@ export class DeferredReadableStream { } const sourceReader = this.sourceReader!; - // Clear source first so future setSource() calls can reattach cleanly. + // Clear active source reader reference before releasing lock. this.sourceReader = undefined; // release lock will make any pending read() throw TypeError diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 414f3ac96..9ea71de61 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -970,6 +970,7 @@ export async function* readStream( stream: ReadableStream, signal?: AbortSignal, ): AsyncGenerator { + if (signal?.aborted) return; const reader = stream.getReader(); try { if (signal) { @@ -998,6 +999,10 @@ export async function* readStream( } export async function waitForAbort(signal: AbortSignal) { + if (signal.aborted) { + return; + } + const abortFuture = new Future(); const handler = () => { abortFuture.resolve(); diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index e672b9e3f..167287cd5 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -8,6 +8,7 @@ import { ROOT_CONTEXT, context as otelContext, trace } from '@opentelemetry/api' import { Heap } from 'heap-js'; import { AsyncLocalStorage } from 'node:async_hooks'; import { ReadableStream, TransformStream } from 'node:stream/web'; +import type { Logger } from 'pino'; import type { InterruptionDetectionError } from '../inference/interruption/errors.js'; import { AdaptiveInterruptionDetector } from '../inference/interruption/interruption_detector.js'; import type { OverlappingSpeechEvent } from '../inference/interruption/types.js'; @@ -96,6 +97,50 @@ interface OnEnterData { agent: Agent; } +export interface ReusableResources { + sttPipeline?: STTPipeline; + rtSession?: RealtimeSession; +} + +export class SchedulingPausedError extends Error { + constructor() { + super('cannot schedule new speech, the speech scheduling is draining/pausing'); + this.name = 'SchedulingPausedError'; + } +} + +export function isSchedulingPausedError(error: unknown): error is SchedulingPausedError { + return error instanceof SchedulingPausedError; +} + +export async function cleanupReusableResources( + resources: ReusableResources, + logger: Logger, +): Promise { + const tasks: Promise[] = []; + if (resources.sttPipeline) { + tasks.push(resources.sttPipeline.close()); + resources.sttPipeline = undefined; + } + if (resources.rtSession) { + tasks.push(resources.rtSession.close()); + resources.rtSession = undefined; + } + + if (tasks.length > 0) { + const outputs = await Promise.allSettled(tasks); + for (const output of outputs) { + if (output.status === 'rejected') { + if (logger) { + logger.error({ error: output.reason }, 'error cleaning up reusable resources'); + } else { + console.error('error cleaning up reusable resources', output.reason); + } + } + } + } +} + interface PreemptiveGeneration { speechHandle: SpeechHandle; userMessage: ChatMessage; @@ -293,26 +338,26 @@ export class AgentActivity implements RecognitionHooks { this.isDefaultInterruptionByAudioActivityEnabled = this.isInterruptionByAudioActivityEnabled; } - async start(options?: { reuseSttPipeline?: STTPipeline }): Promise { + async start(options?: { reuseResources?: ReusableResources }): Promise { const unlock = await this.lock.lock(); try { await this._startSession({ spanName: 'start_agent_activity', runOnEnter: true, - reuseSttPipeline: options?.reuseSttPipeline, + reuseResources: options?.reuseResources, }); } finally { unlock(); } } - async resume(options?: { reuseSttPipeline?: STTPipeline }): Promise { + async resume(options?: { reuseResources?: ReusableResources }): Promise { const unlock = await this.lock.lock(); try { await this._startSession({ spanName: 'resume_agent_activity', runOnEnter: false, - reuseSttPipeline: options?.reuseSttPipeline, + reuseResources: options?.reuseResources, }); } finally { unlock(); @@ -322,9 +367,9 @@ export class AgentActivity implements RecognitionHooks { private async _startSession(options: { spanName: 'start_agent_activity' | 'resume_agent_activity'; runOnEnter: boolean; - reuseSttPipeline?: STTPipeline; + reuseResources?: ReusableResources; }): Promise { - const { spanName, runOnEnter, reuseSttPipeline } = options; + const { spanName, runOnEnter, reuseResources } = options; const startSpan = tracer.startSpan({ name: spanName, attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id }, @@ -334,38 +379,75 @@ export class AgentActivity implements RecognitionHooks { this.agent._agentActivity = this; if (this.llm instanceof RealtimeModel) { - this.realtimeSession = this.llm.session(); + const rtReused = reuseResources?.rtSession !== undefined; + + if (rtReused) { + this.logger.debug('reusing realtime session from previous activity'); + this.realtimeSession = reuseResources!.rtSession; + reuseResources!.rtSession = undefined; // ownership transferred + + // clear any stale audio/generation state + await this.realtimeSession!.interrupt(); + await this.realtimeSession!.clearAudio(); + } else { + this.realtimeSession = this.llm.session(); + } + this.realtimeSpans = new Map(); - this.realtimeSession.on('generation_created', this.onRealtimeGenerationCreated); - this.realtimeSession.on('input_speech_started', this.onRealtimeInputSpeechStarted); - this.realtimeSession.on('input_speech_stopped', this.onRealtimeInputSpeechStopped); - this.realtimeSession.on( + this.realtimeSession!.on('generation_created', this.onRealtimeGenerationCreated); + this.realtimeSession!.on('input_speech_started', this.onRealtimeInputSpeechStarted); + this.realtimeSession!.on('input_speech_stopped', this.onRealtimeInputSpeechStopped); + this.realtimeSession!.on( 'input_audio_transcription_completed', this.onRealtimeInputAudioTranscriptionCompleted, ); - this.realtimeSession.on('metrics_collected', this.onMetricsCollected); - this.realtimeSession.on('error', this.onModelError); + this.realtimeSession!.on('metrics_collected', this.onMetricsCollected); + this.realtimeSession!.on('error', this.onModelError); removeInstructions(this.agent._chatCtx); - try { - await this.realtimeSession.updateInstructions(this.agent.instructions); - } catch (error) { - this.logger.error(error, 'failed to update the instructions'); - } - try { - await this.realtimeSession.updateChatCtx(this.agent.chatCtx); - } catch (error) { - this.logger.error(error, 'failed to update the chat context'); - } + // skip the update if the session is reused and no mid-session update is supported + // this means the content is the same as the previous session + const capabilities = this.llm.capabilities; + if (rtReused && this.realtimeSession?.realtimeModel.provider == 'phonic') { + // if the session is being reused, then call phonic's _updateSession to send a full mid-session config update. + // otherwise, call the separate update_* functions to build the initial config. + try { + await (this.realtimeSession as any)._updateSession( + this.agent.instructions, + this.agent.chatCtx, + this.tools, + ); + } catch (error) { + this.logger.error(error, 'failed to update phonic session'); + } + } else { + if (!rtReused || capabilities.midSessionInstructionsUpdate) { + try { + await this.realtimeSession!.updateInstructions(this.agent.instructions); + } catch (error) { + this.logger.error(error, 'failed to update the instructions'); + } + } - try { - await this.realtimeSession.updateTools(this.tools); - } catch (error) { - this.logger.error(error, 'failed to update the tools'); + if (!rtReused || capabilities.midSessionChatCtxUpdate) { + try { + await this.realtimeSession!.updateChatCtx(this.agent.chatCtx); + } catch (error) { + this.logger.error(error, 'failed to update the chat context'); + } + } + + if (!rtReused || capabilities.midSessionToolsUpdate) { + try { + await this.realtimeSession!.updateTools(this.tools); + } catch (error) { + this.logger.error(error, 'failed to update the tools'); + } + } } - if (!this.llm.capabilities.audioOutput && !this.tts && this.agentSession.output.audio) { + if (!capabilities.audioOutput && !this.tts && this.agentSession.output.audio) { this.logger.error( 'audio output is enabled but RealtimeModel has no audio modality ' + 'and no TTS is set. Either enable audio modality in the RealtimeModel ' + @@ -426,9 +508,10 @@ export class AgentActivity implements RecognitionHooks { getLinkedParticipant: () => this.agentSession._roomIO?.linkedParticipant, }); - if (reuseSttPipeline) { - this.logger.debug('Reusing STT pipeline from previous activity'); - await this.audioRecognition.start({ sttPipeline: reuseSttPipeline }); + if (reuseResources?.sttPipeline) { + this.logger.debug('reusing STT pipeline from previous activity'); + await this.audioRecognition.start({ sttPipeline: reuseResources.sttPipeline }); + reuseResources.sttPipeline = undefined; // ownership transferred } else { await this.audioRecognition.start(); } @@ -454,28 +537,70 @@ export class AgentActivity implements RecognitionHooks { startSpan.end(); } - async _detachSttPipelineIfReusable(newActivity: AgentActivity): Promise { - const hasAudioRecognition = !!this.audioRecognition; - const hasSttOld = !!this.stt; - const hasSttNew = !!newActivity.stt; - const sameSttInstance = this.stt === newActivity.stt; - const sameSttNode = - Object.getPrototypeOf(this.agent).sttNode === - Object.getPrototypeOf(newActivity.agent).sttNode; - - if (!hasAudioRecognition || !hasSttOld || !hasSttNew) { - return undefined; - } + async _detachReusableResources(newActivity: AgentActivity): Promise { + const resources: ReusableResources = {}; + try { + // stt pipeline + if ( + this.audioRecognition && + this.stt && + newActivity.stt && + this.stt === newActivity.stt && + Object.getPrototypeOf(this.agent).sttNode === + Object.getPrototypeOf(newActivity.agent).sttNode + ) { + resources.sttPipeline = await this.audioRecognition.detachSttPipeline(); + } - if (!sameSttInstance) { - return undefined; - } + // rt session + if ( + this.realtimeSession && + this.llm instanceof RealtimeModel && + this.llm === newActivity.llm + ) { + const capabilities = this.llm.capabilities; + + // context update is supported or chat context is equivalent + let reusable = + capabilities.midSessionChatCtxUpdate || + this.realtimeSession.chatCtx + .copy({ excludeInstructions: true, excludeHandoff: true }) + .isEquivalent( + newActivity.agent.chatCtx.copy({ excludeInstructions: true, excludeHandoff: true }), + ); - if (!sameSttNode) { - return undefined; + // instructions update is supported or instructions are the same + reusable = + reusable && + (capabilities.midSessionInstructionsUpdate || + this.agent.instructions === newActivity.agent.instructions); + + // tools update is supported or tools are the same + reusable = + reusable && + (capabilities.midSessionToolsUpdate || isSameToolContext(this.tools, newActivity.tools)); + + if (reusable) { + // detach: remove event listeners but don't close the session + this.realtimeSession.off('generation_created', this.onRealtimeGenerationCreated); + this.realtimeSession.off('input_speech_started', this.onRealtimeInputSpeechStarted); + this.realtimeSession.off('input_speech_stopped', this.onRealtimeInputSpeechStopped); + this.realtimeSession.off( + 'input_audio_transcription_completed', + this.onRealtimeInputAudioTranscriptionCompleted, + ); + this.realtimeSession.off('metrics_collected', this.onMetricsCollected); + this.realtimeSession.off('error', this.onModelError); + resources.rtSession = this.realtimeSession; + this.realtimeSession = undefined; // prevent _closeSessionResources from closing it + } + } + } catch (error) { + await cleanupReusableResources(resources, this.logger); + throw error; } - return await this.audioRecognition!.detachSttPipeline(); + return resources; } get currentSpeech(): SpeechHandle | undefined { @@ -709,7 +834,7 @@ export class AgentActivity implements RecognitionHooks { allowInterruptions: defaultAllowInterruptions, addToChatCtx = true, } = options ?? {}; - let allowInterruptions = defaultAllowInterruptions; + const allowInterruptions = defaultAllowInterruptions; if ( !audio && @@ -720,18 +845,6 @@ export class AgentActivity implements RecognitionHooks { throw new Error('trying to generate speech from text without a TTS model'); } - if ( - this.llm instanceof RealtimeModel && - this.llm.capabilities.turnDetection && - allowInterruptions === false - ) { - this.logger.warn( - 'the RealtimeModel uses a server-side turn detection, allowInterruptions cannot be false when using VoiceAgent.say(), ' + - 'disable turnDetection in the RealtimeModel and use VAD on the AgentTask/VoiceAgent instead', - ); - allowInterruptions = true; - } - const handle = SpeechHandle.create({ allowInterruptions: allowInterruptions ?? this.allowInterruptions, }); @@ -744,11 +857,12 @@ export class AgentActivity implements RecognitionHooks { speechHandle: handle, }), ); + const task = this.createSpeechTask({ taskFn: (abortController: AbortController) => this.ttsTask(handle, text, addToChatCtx, {}, abortController, audio), ownedSpeechHandle: handle, - name: 'AgentActivity.say_tts', + name: 'AgentActivity.tts_say', }); task.result.finally(() => this.onPipelineReplyDone()); @@ -2272,6 +2386,7 @@ export class AgentActivity implements RecognitionHooks { ev: GenerationCreatedEvent, modelSettings: ModelSettings, replyAbortController: AbortController, + addToChatCtx: boolean = true, ): Promise { return tracer.startActiveSpan( async (span) => @@ -2280,6 +2395,7 @@ export class AgentActivity implements RecognitionHooks { ev, modelSettings, replyAbortController, + addToChatCtx, span, }), { @@ -2294,12 +2410,14 @@ export class AgentActivity implements RecognitionHooks { ev, modelSettings, replyAbortController, + addToChatCtx, span, }: { speechHandle: SpeechHandle; ev: GenerationCreatedEvent; modelSettings: ModelSettings; replyAbortController: AbortController; + addToChatCtx: boolean; span: Span; }): Promise { speechHandle._agentTurnContext = otelContext.active(); @@ -2571,7 +2689,7 @@ export class AgentActivity implements RecognitionHooks { }); } - if (forwardedText) { + if (forwardedText && addToChatCtx) { const message = ChatMessage.create({ role: 'assistant', content: forwardedText, @@ -2596,7 +2714,7 @@ export class AgentActivity implements RecognitionHooks { return; } - if (messageOutputs.length > 0) { + if (messageOutputs.length > 0 && addToChatCtx) { // there should be only one message const [msgId, textOut, _, __] = messageOutputs[0]!; const message = ChatMessage.create({ @@ -2827,7 +2945,7 @@ export class AgentActivity implements RecognitionHooks { // when force=true, we allow tool responses to bypass scheduling pause // This allows for tool responses to be generated before the AgentActivity is finalized if (this.schedulingPaused && !force) { - throw new Error('cannot schedule new speech, the speech scheduling is draining/pausing'); + throw new SchedulingPausedError(); } // Monotonic time to avoid near 0 collisions @@ -2858,8 +2976,13 @@ export class AgentActivity implements RecognitionHooks { this._mainTask = Task.from(({ signal }) => this.mainTask(signal)); } - async pause(options: { blockedTasks?: Task[] } = {}): Promise { - const { blockedTasks = [] } = options; + async pause( + options: { + blockedTasks?: Task[]; + newActivity?: AgentActivity; + } = {}, + ): Promise { + const { blockedTasks = [], newActivity } = options; const unlock = await this.lock.lock(); try { @@ -2867,31 +2990,49 @@ export class AgentActivity implements RecognitionHooks { name: 'pause_agent_activity', attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id }, }); + + let resources: ReusableResources | undefined; try { await this._pauseSchedulingTask(blockedTasks); + + // detach after speech tasks are done but before _closeSessionResources + if (newActivity) { + resources = await this._detachReusableResources(newActivity); + } + await this._closeSessionResources(); + } catch (error) { + if (resources) { + await cleanupReusableResources(resources, this.logger); + } + throw error; } finally { span.end(); } + + return resources; } finally { unlock(); } } - async drain(): Promise { + async drain(options?: { newActivity?: AgentActivity }): Promise { // Create drain_agent_activity as a ROOT span (new trace) to match Python behavior - return tracer.startActiveSpan(async (span) => this._drainImpl(span), { + return tracer.startActiveSpan(async (span) => this._drainImpl(span, options?.newActivity), { name: 'drain_agent_activity', context: ROOT_CONTEXT, }); } - private async _drainImpl(span: Span): Promise { + private async _drainImpl( + span: Span, + newActivity?: AgentActivity, + ): Promise { span.setAttribute(traceTypes.ATTR_AGENT_LABEL, this.agent.id); const unlock = await this.lock.lock(); try { - if (this._schedulingPaused) return; + if (this._schedulingPaused) return undefined; this._onExitTask = this.createSpeechTask({ taskFn: () => @@ -2907,6 +3048,16 @@ export class AgentActivity implements RecognitionHooks { await this._onExitTask.result; await this._pauseSchedulingTask([]); + + // detach after speech tasks are done but before _closeSessionResources + if (newActivity) { + try { + return await this._detachReusableResources(newActivity); + } catch (error) { + this.logger.error(error, 'failed to detach reusable resources'); + } + } + return undefined; } finally { unlock(); } diff --git a/agents/src/voice/agent_activity_handoff.test.ts b/agents/src/voice/agent_activity_handoff.test.ts index e27fc507a..f9422b34d 100644 --- a/agents/src/voice/agent_activity_handoff.test.ts +++ b/agents/src/voice/agent_activity_handoff.test.ts @@ -3,14 +3,23 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import { describe, expect, it, vi } from 'vitest'; -import { type SpeechEvent } from '../stt/stt.js'; +import { ChatContext } from '../llm/chat_context.js'; +import { type RealtimeCapabilities, RealtimeModel, type RealtimeSession } from '../llm/realtime.js'; +import type { SpeechEvent } from '../stt/stt.js'; import { Agent } from './agent.js'; -import { AgentActivity } from './agent_activity.js'; +import { + AgentActivity, + type ReusableResources, + cleanupReusableResources, +} from './agent_activity.js'; type FakeActivity = { agent: Agent; audioRecognition: { detachSttPipeline: ReturnType } | undefined; stt: unknown; + llm: unknown; + tools: unknown; + realtimeSession: unknown; }; function createFakeActivity(agent: Agent, stt: unknown) { @@ -21,13 +30,19 @@ function createFakeActivity(agent: Agent, stt: unknown) { detachSttPipeline: vi.fn(async () => detachedPipeline), }, stt, + llm: undefined, + tools: [], + realtimeSession: undefined, } as FakeActivity; return { activity, detachedPipeline }; } -async function detachIfReusable(oldActivity: FakeActivity, newActivity: FakeActivity) { - return await (AgentActivity.prototype as any)._detachSttPipelineIfReusable.call( +async function detachResources( + oldActivity: FakeActivity, + newActivity: FakeActivity, +): Promise { + return await (AgentActivity.prototype as any)._detachReusableResources.call( oldActivity, newActivity, ); @@ -39,9 +54,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), sharedStt); const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBe(oldActivity.detachedPipeline); + expect(resources.sttPipeline).toBe(oldActivity.detachedPipeline); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1); }); @@ -49,9 +64,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), { id: 'stt-a' }); const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), { id: 'stt-b' }); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled(); }); @@ -60,9 +75,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), undefined); const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled(); }); @@ -84,9 +99,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt); const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled(); }); @@ -104,9 +119,9 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt); const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt); - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBe(oldActivity.detachedPipeline); + expect(resources.sttPipeline).toBe(oldActivity.detachedPipeline); expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1); }); @@ -116,8 +131,208 @@ describe('AgentActivity STT handoff reuse eligibility', () => { const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt); oldActivity.activity.audioRecognition = undefined; - const result = await detachIfReusable(oldActivity.activity, newActivity.activity); + const resources = await detachResources(oldActivity.activity, newActivity.activity); - expect(result).toBeUndefined(); + expect(resources.sttPipeline).toBeUndefined(); + }); +}); + +describe('AgentActivity RT session reuse eligibility', () => { + function createFakeRtSession(): RealtimeSession { + return { + chatCtx: ChatContext.empty(), + off: vi.fn(), + on: vi.fn(), + interrupt: vi.fn(), + clearAudio: vi.fn(), + close: vi.fn(async () => {}), + } as unknown as RealtimeSession; + } + + class FakeRealtimeModel extends RealtimeModel { + get model() { + return 'fake'; + } + session(): RealtimeSession { + throw new Error('not implemented'); + } + async close() {} + } + + function createFakeRealtimeModel(capabilitiesOverrides: Partial = {}) { + const capabilities: RealtimeCapabilities = { + messageTruncation: false, + turnDetection: false, + userTranscription: false, + autoToolReplyGeneration: false, + audioOutput: true, + manualFunctionCalls: false, + midSessionChatCtxUpdate: false, + midSessionInstructionsUpdate: false, + midSessionToolsUpdate: false, + ...capabilitiesOverrides, + }; + return new FakeRealtimeModel(capabilities); + } + + function createRtActivity(agent: Agent, llm: unknown, rtSession?: RealtimeSession): FakeActivity { + return { + agent, + audioRecognition: undefined, + stt: undefined, + llm, + tools: agent.toolCtx, + realtimeSession: rtSession, + }; + } + + it('reuses RT session when same LLM, same instructions, equivalent context, and same tools', async () => { + const sharedLlm = createFakeRealtimeModel(); + const rtSession = createFakeRtSession(); + + const oldAgent = new Agent({ instructions: 'hello' }); + const newAgent = new Agent({ instructions: 'hello' }); + const oldActivity = createRtActivity(oldAgent, sharedLlm, rtSession); + const newActivity = createRtActivity(newAgent, sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBe(rtSession); + expect(oldActivity.realtimeSession).toBeUndefined(); + expect(rtSession.off).toHaveBeenCalled(); + }); + + it('does not reuse RT session when LLM instances differ', async () => { + const rtSession = createFakeRtSession(); + + const oldLlm = createFakeRealtimeModel(); + const newLlm = createFakeRealtimeModel(); + const oldActivity = createRtActivity(new Agent({ instructions: 'a' }), oldLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'a' }), newLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); + + it('does not reuse RT session when instructions differ and midSessionInstructionsUpdate is false', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionInstructionsUpdate: false }); + const rtSession = createFakeRtSession(); + + const oldActivity = createRtActivity(new Agent({ instructions: 'old' }), sharedLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'new' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); + + it('reuses RT session when instructions differ but midSessionInstructionsUpdate is true', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionInstructionsUpdate: true }); + const rtSession = createFakeRtSession(); + + const oldActivity = createRtActivity(new Agent({ instructions: 'old' }), sharedLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'new' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBe(rtSession); + }); + + it('reuses RT session when context differs but midSessionChatCtxUpdate is true', async () => { + const sharedLlm = createFakeRealtimeModel({ midSessionChatCtxUpdate: true }); + const rtSession = createFakeRtSession(); + // Give the session a non-empty chat context + (rtSession as any).chatCtx = ChatContext.empty(); + + const oldActivity = createRtActivity(new Agent({ instructions: 'same' }), sharedLlm, rtSession); + const newActivity = createRtActivity(new Agent({ instructions: 'same' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBe(rtSession); + }); + + it('does not reuse when no RT session exists', async () => { + const sharedLlm = createFakeRealtimeModel(); + const oldActivity = createRtActivity(new Agent({ instructions: 'a' }), sharedLlm, undefined); + const newActivity = createRtActivity(new Agent({ instructions: 'a' }), sharedLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); + + it('does not reuse when LLM is not a RealtimeModel', async () => { + const rtSession = createFakeRtSession(); + const nonRealtimeLlm = { id: 'plain-llm' }; + + const oldActivity = createRtActivity( + new Agent({ instructions: 'a' }), + nonRealtimeLlm, + rtSession, + ); + const newActivity = createRtActivity(new Agent({ instructions: 'a' }), nonRealtimeLlm); + + const resources = await (AgentActivity.prototype as any)._detachReusableResources.call( + oldActivity, + newActivity, + ); + + expect(resources.rtSession).toBeUndefined(); + }); +}); + +describe('cleanupReusableResources', () => { + it('closes both STT pipeline and RT session', async () => { + const sttClose = vi.fn(async () => {}); + const rtClose = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: sttClose } as any, + rtSession: { close: rtClose } as any, + }; + + await cleanupReusableResources(resources); + + expect(sttClose).toHaveBeenCalledTimes(1); + expect(rtClose).toHaveBeenCalledTimes(1); + expect(resources.sttPipeline).toBeUndefined(); + expect(resources.rtSession).toBeUndefined(); + }); + + it('handles partial resources (only STT)', async () => { + const sttClose = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: sttClose } as any, + }; + + await cleanupReusableResources(resources); + + expect(sttClose).toHaveBeenCalledTimes(1); + expect(resources.sttPipeline).toBeUndefined(); + }); + + it('handles empty resources', async () => { + const resources: ReusableResources = {}; + await cleanupReusableResources(resources); + // should not throw }); }); diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 3cc11c79a..3f4e813d0 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -39,8 +39,13 @@ import { import { Task } from '../utils.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; -import { AgentActivity } from './agent_activity.js'; -import type { STTPipeline, _TurnDetector } from './audio_recognition.js'; +import { + AgentActivity, + type ReusableResources, + cleanupReusableResources, + isSchedulingPausedError, +} from './agent_activity.js'; +import type { _TurnDetector } from './audio_recognition.js'; import { type AgentEvent, AgentSessionEventTypes, @@ -680,7 +685,22 @@ export class AgentSession< } return nextActivity.generateReply({ userMessage, ...options }); } - return activity.generateReply({ userMessage, ...options }); + + // Handoff can race with scheduling pause between the check above and generateReply(). + // If that happens, retry on the next activity instead of surfacing an avoidable error. + try { + return activity.generateReply({ userMessage, ...options }); + } catch (error) { + const canFallback = nextActivity !== undefined && isSchedulingPausedError(error); + if (!canFallback) { + throw error; + } + this.logger.debug( + { error }, + 'generateReply scheduling raced with handoff drain; retrying on next activity', + ); + return nextActivity.generateReply({ userMessage, ...options }); + } }; // attach to the session span if called outside of the AgentSession @@ -762,7 +782,7 @@ export class AgentSession< const runWithContext = async () => { const unlock = await this.activityLock.lock(); let onEnterTask: Task | undefined; - let reusedSttPipeline: STTPipeline | undefined; + let reusableResources: ReusableResources | undefined; try { this.agent = agent; @@ -785,15 +805,16 @@ export class AgentSession< this.nextActivity = agent._agentActivity; } - if (prevActivityObj && this.nextActivity && prevActivityObj !== this.nextActivity) { - reusedSttPipeline = await prevActivityObj._detachSttPipelineIfReusable(this.nextActivity); - } - if (prevActivityObj && prevActivityObj !== this.nextActivity) { if (previousActivity === 'pause') { - await prevActivityObj.pause({ blockedTasks }); + reusableResources = await prevActivityObj.pause({ + blockedTasks, + newActivity: this.nextActivity, + }); } else { - await prevActivityObj.drain(); + reusableResources = await prevActivityObj.drain({ + newActivity: this.nextActivity, + }); await prevActivityObj.close(); } } @@ -803,8 +824,10 @@ export class AgentSession< { agentId: this.nextActivity?.agent.id }, 'Session is closing, skipping start of next activity', ); - await reusedSttPipeline?.close(); - reusedSttPipeline = undefined; + if (reusableResources) { + await cleanupReusableResources(reusableResources, this.logger); + reusableResources = undefined; + } this.nextActivity = undefined; this.activity = undefined; return; @@ -834,11 +857,11 @@ export class AgentSession< ); if (newActivity === 'start') { - await this.activity!.start({ reuseSttPipeline: reusedSttPipeline }); + await this.activity!.start({ reuseResources: reusableResources }); } else { - await this.activity!.resume({ reuseSttPipeline: reusedSttPipeline }); + await this.activity!.resume({ reuseResources: reusableResources }); } - reusedSttPipeline = undefined; + reusableResources = undefined; onEnterTask = this.activity!._onEnterTask; @@ -846,9 +869,11 @@ export class AgentSession< this.activity!.attachAudioInput(this._input.audio.stream); } } catch (error) { - // JS safeguard: session cleanup owns the detached pipeline until the next activity + // JS safeguard: session cleanup owns the detached resources until the next activity // starts successfully, preventing leaks when handoff fails mid-transition. - await reusedSttPipeline?.close(); + if (reusableResources) { + await cleanupReusableResources(reusableResources, this.logger); + } throw error; } finally { unlock(); diff --git a/agents/src/voice/agent_session_handoff.test.ts b/agents/src/voice/agent_session_handoff.test.ts index d3e054f3c..ffacb1ab2 100644 --- a/agents/src/voice/agent_session_handoff.test.ts +++ b/agents/src/voice/agent_session_handoff.test.ts @@ -4,7 +4,7 @@ import { describe, expect, it, vi } from 'vitest'; import { ChatContext } from '../llm/chat_context.js'; import { Agent } from './agent.js'; -import { AgentActivity } from './agent_activity.js'; +import { AgentActivity, type ReusableResources } from './agent_activity.js'; import { AgentSession } from './agent_session.js'; function createFakeLock() { @@ -50,19 +50,18 @@ function createFakeSession() { } as unknown as AgentSession; } -describe('AgentSession STT pipeline handoff', () => { - it('passes a detached STT pipeline into the next resumed activity', async () => { - const pipeline = { - close: vi.fn(async () => {}), +describe('AgentSession reusable resources handoff', () => { + it('passes reusable resources from drain into the next resumed activity', async () => { + const resources: ReusableResources = { + sttPipeline: { close: vi.fn(async () => {}) } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const nextActivity = { agent: nextAgent, @@ -81,23 +80,22 @@ describe('AgentSession STT pipeline handoff', () => { waitOnEnter: false, }); - expect(previousActivity._detachSttPipelineIfReusable).toHaveBeenCalledWith(nextActivity); - expect(nextActivity.resume).toHaveBeenCalledWith({ reuseSttPipeline: pipeline }); - expect(pipeline.close).not.toHaveBeenCalled(); + expect(previousActivity.drain).toHaveBeenCalledWith({ newActivity: nextActivity }); + expect(nextActivity.resume).toHaveBeenCalledWith({ reuseResources: resources }); }); - it('closes the detached pipeline if the next activity fails to start', async () => { - const pipeline = { - close: vi.fn(async () => {}), + it('cleans up reusable resources if the next activity fails to start', async () => { + const closeFn = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: closeFn } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const nextActivity = { agent: nextAgent, @@ -120,21 +118,21 @@ describe('AgentSession STT pipeline handoff', () => { }), ).rejects.toThrow('resume failed'); - expect(pipeline.close).toHaveBeenCalledTimes(1); + expect(closeFn).toHaveBeenCalledTimes(1); }); - it('does not close the adopted pipeline after the next activity starts successfully', async () => { - const pipeline = { - close: vi.fn(async () => {}), + it('does not cleanup reusable resources after the next activity starts successfully', async () => { + const closeFn = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: closeFn } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const nextActivity = { agent: nextAgent, @@ -158,18 +156,18 @@ describe('AgentSession STT pipeline handoff', () => { }), ).rejects.toThrow('attach failed'); - expect(nextActivity.resume).toHaveBeenCalledWith({ reuseSttPipeline: pipeline }); - expect(pipeline.close).not.toHaveBeenCalled(); + expect(nextActivity.resume).toHaveBeenCalledWith({ reuseResources: resources }); + // pipeline was already transferred, so cleanup should NOT have been called + expect(closeFn).not.toHaveBeenCalled(); }); - it('skips STT detach when the same activity object is reused', async () => { + it('skips detach when the same activity object is reused', async () => { const agent = new Agent({ instructions: 'same' }); const activity = { agent, - _detachSttPipelineIfReusable: vi.fn(async () => undefined), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => undefined), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => undefined), resume: vi.fn(async () => {}), start: vi.fn(async () => {}), attachAudioInput: vi.fn(), @@ -185,22 +183,23 @@ describe('AgentSession STT pipeline handoff', () => { waitOnEnter: false, }); - expect(activity._detachSttPipelineIfReusable).not.toHaveBeenCalled(); - expect(activity.resume).toHaveBeenCalledWith({ reuseSttPipeline: undefined }); + expect(activity.drain).not.toHaveBeenCalled(); + expect(activity.pause).not.toHaveBeenCalled(); + expect(activity.resume).toHaveBeenCalledWith({ reuseResources: undefined }); }); - it('skips starting a new activity while the session is closing and closes the detached pipeline', async () => { - const pipeline = { - close: vi.fn(async () => {}), + it('skips starting a new activity while the session is closing and cleans up resources', async () => { + const closeFn = vi.fn(async () => {}); + const resources: ReusableResources = { + sttPipeline: { close: closeFn } as any, }; const previousAgent = new Agent({ instructions: 'old' }); const nextAgent = new Agent({ instructions: 'new' }); const previousActivity = { agent: previousAgent, - _detachSttPipelineIfReusable: vi.fn(async () => pipeline), - drain: vi.fn(async () => {}), + drain: vi.fn(async () => resources), close: vi.fn(async () => {}), - pause: vi.fn(async () => {}), + pause: vi.fn(async () => resources), }; const startSpy = vi.spyOn(AgentActivity.prototype, 'start').mockResolvedValue(undefined); @@ -215,9 +214,9 @@ describe('AgentSession STT pipeline handoff', () => { waitOnEnter: false, }); - expect(previousActivity._detachSttPipelineIfReusable).toHaveBeenCalledTimes(1); + expect(previousActivity.drain).toHaveBeenCalledTimes(1); expect(previousActivity.close).toHaveBeenCalledTimes(1); - expect(pipeline.close).toHaveBeenCalledTimes(1); + expect(closeFn).toHaveBeenCalledTimes(1); expect(startSpy).not.toHaveBeenCalled(); expect((session as any).activity).toBeUndefined(); expect((session as any).nextActivity).toBeUndefined(); diff --git a/examples/src/survey_agent.ts b/examples/src/survey_agent.ts new file mode 100644 index 000000000..d455bdf64 --- /dev/null +++ b/examples/src/survey_agent.ts @@ -0,0 +1,375 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type JobContext, + ServerOptions, + beta, + cli, + defineAgent, + llm, + voice, +} from '@livekit/agents'; +// import * as phonic from '@livekit/agents-plugin-phonic'; +import { access, appendFile } from 'node:fs/promises'; +import { fileURLToPath } from 'node:url'; +import { z } from 'zod'; + +type SurveyUserData = { + filename: string; + candidateName: string; + taskResults: Record; +}; + +type IntroResults = { + name: string; + intro: string; +}; + +type EmailResults = { + email: string; +}; + +type CommuteResults = { + canCommute: boolean; + commuteMethod: 'driving' | 'bus' | 'subway' | 'none'; +}; + +type ExperienceResults = { + yearsOfExperience: number; + experienceDescription: string; +}; + +type BehavioralResults = { + strengths: string; + weaknesses: string; + workStyle: 'independent' | 'team_player'; +}; + +function toCsvValue(value: unknown): string { + const raw = typeof value === 'string' ? value : JSON.stringify(value); + return `"${raw.replace(/"/g, '""')}"`; +} + +async function writeCsvRow(path: string, data: Record): Promise { + let hasFile = true; + try { + await access(path); + } catch { + hasFile = false; + } + + const keys = Object.keys(data); + const row = keys.map((key) => toCsvValue(data[key])).join(',') + '\n'; + + if (!hasFile) { + await appendFile(path, keys.join(',') + '\n', 'utf8'); + } + await appendFile(path, row, 'utf8'); +} + +function disqualifyTool() { + return llm.tool({ + description: + 'End the interview if the candidate refuses to cooperate, provides inappropriate answers, or is not a fit.', + parameters: z.object({ + disqualificationReason: z.string().describe('Why the interview should end immediately'), + }), + execute: async ({ disqualificationReason }, { ctx }: llm.ToolOptions) => { + const reason = `[DISQUALIFIED] ${disqualificationReason}`; + await writeCsvRow(ctx.userData.filename, { + name: ctx.userData.candidateName || 'unknown', + disqualificationReason: reason, + }); + await ctx.session.say( + `Thanks for your time today. We are ending the interview now. Reason: ${disqualificationReason}.`, + ); + ctx.session.shutdown(); + return 'Interview ended and disqualification saved.'; + }, + }); +} + +class IntroTask extends voice.AgentTask { + constructor() { + super({ + instructions: + 'You are Alex, an interviewer screening a software engineer candidate. Gather the candidate name and short self-introduction.', + tools: { + saveIntro: llm.tool({ + description: 'Save candidate name and intro notes.', + parameters: z.object({ + name: z.string().describe('Candidate name'), + intro: z.string().describe('Short notes from their introduction'), + }), + execute: async ({ name, intro }) => { + (this.session.userData as SurveyUserData).candidateName = name; + this.complete({ name, intro }); + return `Saved intro for ${name}.`; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'Welcome the candidate and collect their name plus a brief self-introduction, then call saveIntro.', + }); + } +} + +class EmailTask extends voice.AgentTask { + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect a valid email address. If the candidate refuses, call disqualify immediately.', + tools: { + disqualify, + saveEmail: llm.tool({ + description: 'Save candidate email address.', + parameters: z.object({ + email: z.string().describe('Candidate email'), + }), + execute: async ({ email }) => { + this.complete({ email }); + return `Saved email: ${email}`; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: 'Ask for the candidate email and call saveEmail as soon as you get it.', + }); + } +} + +class CommuteTask extends voice.AgentTask { + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect commute flexibility. The role expects office attendance three days per week.', + tools: { + disqualify, + saveCommute: llm.tool({ + description: 'Save candidate commute information.', + parameters: z.object({ + canCommute: z.boolean().describe('Whether the candidate can commute to office'), + commuteMethod: z + .enum(['driving', 'bus', 'subway', 'none']) + .describe('Main commute method'), + }), + execute: async ({ canCommute, commuteMethod }) => { + this.complete({ canCommute, commuteMethod }); + return 'Saved commute flexibility.'; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'Ask if the candidate can commute to office regularly and capture the commute method, then call saveCommute.', + }); + } +} + +class ExperienceTask extends voice.AgentTask { + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect years of experience and a concise timeline of previous roles relevant to software engineering.', + tools: { + disqualify, + saveExperience: llm.tool({ + description: 'Save candidate experience details.', + parameters: z.object({ + yearsOfExperience: z + .number() + .describe('Total years of professional software experience'), + experienceDescription: z.string().describe('Summary of previous roles and employers'), + }), + execute: async ({ yearsOfExperience, experienceDescription }) => { + this.complete({ yearsOfExperience, experienceDescription }); + return 'Saved experience details.'; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'Ask about years of experience and previous roles, then call saveExperience once gathered.', + }); + } +} + +class BehavioralTask extends voice.AgentTask { + private partial: Partial = {}; + + constructor() { + const disqualify = disqualifyTool(); + super({ + instructions: + 'Collect strengths, weaknesses, and work style. Keep a natural conversational tone and avoid bullet lists.', + tools: { + disqualify, + saveStrengths: llm.tool({ + description: "Save a concise summary of the candidate's strengths.", + parameters: z.object({ + strengths: z.string().describe('Strengths summary'), + }), + execute: async ({ strengths }) => { + this.partial.strengths = strengths; + this.checkCompletion(); + return 'Saved strengths.'; + }, + }), + saveWeaknesses: llm.tool({ + description: "Save a concise summary of the candidate's weaknesses.", + parameters: z.object({ + weaknesses: z.string().describe('Weaknesses summary'), + }), + execute: async ({ weaknesses }) => { + this.partial.weaknesses = weaknesses; + this.checkCompletion(); + return 'Saved weaknesses.'; + }, + }), + saveWorkStyle: llm.tool({ + description: "Save candidate's work style.", + parameters: z.object({ + workStyle: z.enum(['independent', 'team_player']).describe('Primary work style'), + }), + execute: async ({ workStyle }) => { + this.partial.workStyle = workStyle; + this.checkCompletion(); + return 'Saved work style.'; + }, + }), + }, + }); + } + + async onEnter() { + await this.session.generateReply({ + instructions: + 'In a conversational way, gather strengths, weaknesses, and work style, then call save* tools.', + }); + } + + private checkCompletion() { + if (this.partial.strengths && this.partial.weaknesses && this.partial.workStyle) { + this.complete({ + strengths: this.partial.strengths, + weaknesses: this.partial.weaknesses, + workStyle: this.partial.workStyle, + }); + return; + } + + this.session.generateReply({ + instructions: + 'Continue gathering missing behavioral details in a concise, natural dialogue and use save* tools.', + }); + } +} + +class SurveyAgent extends voice.Agent { + constructor() { + super({ + instructions: + 'You are a survey interviewer for a software engineer screening. Be concise, professional, and natural. Call endScreening when the process is complete.', + tools: { + endScreening: llm.tool({ + description: 'End interview and hang up.', + execute: async (_, { ctx }: llm.ToolOptions) => { + ctx.session.shutdown(); + return 'Interview concluded.'; + }, + }), + }, + }); + } + + async onEnter() { + const group = new beta.TaskGroup({ + summarizeChatCtx: false, + }); + + group.add(() => new IntroTask(), { + id: 'intro_task', + description: 'Collect candidate name and intro', + }); + group.add(() => new EmailTask(), { + id: 'email_task', + description: 'Collect candidate email', + }); + group.add(() => new CommuteTask(), { + id: 'commute_task', + description: 'Collect commute flexibility and method', + }); + group.add(() => new ExperienceTask(), { + id: 'experience_task', + description: 'Collect years of experience and role history', + }); + group.add(() => new BehavioralTask(), { + id: 'behavioral_task', + description: 'Collect strengths, weaknesses, and work style', + }); + + const result = await group.run(); + const summaryItem = this.chatCtx.items[this.chatCtx.items.length - 1]; + let summaryText = ''; + if (summaryItem && 'content' in summaryItem) { + summaryText = + typeof summaryItem.content === 'string' + ? summaryItem.content + : JSON.stringify(summaryItem.content ?? ''); + } + + const mergedResults: Record = { + ...result.taskResults, + summary: summaryText, + }; + this.session.userData.taskResults = mergedResults; + await writeCsvRow(this.session.userData.filename, mergedResults); + + await this.session.say( + 'The interview is now complete. Thank you for your time. We will follow up within three business days.', + ); + } +} + +export default defineAgent({ + entry: async (ctx: JobContext) => { + const session = new voice.AgentSession({ + llm: 'openai/gpt-4.1', + stt: 'deepgram/nova-3', + tts: 'cartesia/sonic-3', + userData: { + filename: 'survey_results.csv', + candidateName: '', + taskResults: {}, + }, + }); + + await session.start({ + agent: new SurveyAgent(), + room: ctx.room, + }); + }, +}); + +cli.runApp(new ServerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/plugins/google/src/beta/realtime/realtime_api.ts b/plugins/google/src/beta/realtime/realtime_api.ts index eb612c32e..da0ebba19 100644 --- a/plugins/google/src/beta/realtime/realtime_api.ts +++ b/plugins/google/src/beta/realtime/realtime_api.ts @@ -311,6 +311,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: true, audioOutput: options.modalities?.includes(Modality.AUDIO) ?? true, manualFunctionCalls: false, + midSessionChatCtxUpdate: false, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: false, + perResponseToolChoice: false, }); // Environment variable fallbacks diff --git a/plugins/hedra/package.json b/plugins/hedra/package.json index 0ef1e78d5..6003f1171 100644 --- a/plugins/hedra/package.json +++ b/plugins/hedra/package.json @@ -35,7 +35,7 @@ }, "devDependencies": { "@livekit/agents": "workspace:*", - "@livekit/rtc-node": "^0.13.22", + "@livekit/rtc-node": "catalog:", "@microsoft/api-extractor": "^7.35.0", "pino": "^8.19.0", "tsup": "^8.3.5", @@ -46,6 +46,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:*", - "@livekit/rtc-node": "^0.13.22" + "@livekit/rtc-node": "catalog:" } } diff --git a/plugins/openai/src/realtime/realtime_model.ts b/plugins/openai/src/realtime/realtime_model.ts index 23643f6f3..633100e49 100644 --- a/plugins/openai/src/realtime/realtime_model.ts +++ b/plugins/openai/src/realtime/realtime_model.ts @@ -185,6 +185,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: false, audioOutput: modalities.includes('audio'), manualFunctionCalls: true, + midSessionChatCtxUpdate: true, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: true, + perResponseToolChoice: true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); @@ -477,17 +481,75 @@ export class RealtimeSession extends llm.RealtimeSession { async updateChatCtx(_chatCtx: llm.ChatContext): Promise { const unlock = await this.updateChatCtxLock.lock(); try { + const validation = llm.validateChatContextStructure(_chatCtx); + const blockingErrors = validation.issues.filter( + (issue: llm.ChatContextValidationIssue) => + issue.severity === 'error' && issue.code !== 'timestamp_order', + ); + const timestampOrderIssue = validation.issues.find( + (issue: llm.ChatContextValidationIssue) => issue.code === 'timestamp_order', + ); + if (blockingErrors.length > 0) { + this.#logger.error( + { issues: validation.issues, blockingErrors }, + 'Invalid chat context supplied to updateChatCtx', + ); + throw new Error( + `Invalid chat context: ${validation.errors} errors, ${validation.warnings} warnings`, + ); + } + if (timestampOrderIssue) { + this.#logger.warn( + { timestampOrderIssue }, + 'Proceeding with non-monotonic createdAt ordering in realtime chat context', + ); + } + if (lkOaiDebug > 0 && validation.warnings > 0) { + this.#logger.debug( + { + warnings: validation.warnings, + issues: validation.issues, + }, + 'Chat context warnings detected before realtime update', + ); + } + const events = await this.createChatCtxUpdateEvents(_chatCtx); const futures: Future[] = []; + const ownedCreateFutures: { [id: string]: Future } = {}; + const ownedDeleteFutures: { [id: string]: Future } = {}; + + const cleanupTimedOutFutures = () => { + // remove timed-out entries so late server acks + // don't resolve stale futures from a previous updateChatCtx call. + for (const [itemId, future] of Object.entries(ownedDeleteFutures)) { + if (this.itemDeleteFutures[itemId] === future) { + delete this.itemDeleteFutures[itemId]; + } + } + for (const [itemId, future] of Object.entries(ownedCreateFutures)) { + if (this.itemCreateFutures[itemId] === future) { + delete this.itemCreateFutures[itemId]; + } + } + }; for (const event of events) { - const future = new Future(); - futures.push(future); - if (event.type === 'conversation.item.create') { + const future = new Future(); + futures.push(future); this.itemCreateFutures[event.item.id] = future; + ownedCreateFutures[event.item.id] = future; } else if (event.type == 'conversation.item.delete') { + const existingDeleteFuture = this.itemDeleteFutures[event.item_id]; + if (existingDeleteFuture) { + futures.push(existingDeleteFuture); + continue; + } + const future = new Future(); + futures.push(future); this.itemDeleteFutures[event.item_id] = future; + ownedDeleteFutures[event.item_id] = future; } this.sendEvent(event); @@ -497,13 +559,21 @@ export class RealtimeSession extends llm.RealtimeSession { return; } - // wait for futures to resolve or timeout - await Promise.race([ - Promise.all(futures), - delay(5000).then(() => { - throw new Error('Chat ctx update events timed out'); - }), - ]); + // wait for futures to resolve or timeout. + // Cancel the timeout branch once futures resolve to avoid stale cleanup. + const timeoutController = new AbortController(); + const timeoutPromise = delay(5000, { signal: timeoutController.signal }).then(() => { + cleanupTimedOutFutures(); + throw new Error('Chat ctx update events timed out'); + }); + + try { + await Promise.race([Promise.all(futures), timeoutPromise]); + } finally { + if (!timeoutController.signal.aborted) { + timeoutController.abort(); + } + } } catch (e) { this.#logger.error((e as Error).message); throw e; diff --git a/plugins/openai/src/realtime/realtime_model_beta.ts b/plugins/openai/src/realtime/realtime_model_beta.ts index ce7a2a51b..d81720d67 100644 --- a/plugins/openai/src/realtime/realtime_model_beta.ts +++ b/plugins/openai/src/realtime/realtime_model_beta.ts @@ -177,6 +177,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: false, audioOutput: modalities.includes('audio'), manualFunctionCalls: true, + midSessionChatCtxUpdate: true, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: true, + perResponseToolChoice: true, }); const isAzure = !!(options.apiVersion || options.entraToken || options.azureDeployment); diff --git a/plugins/phonic/package.json b/plugins/phonic/package.json index 600cfb37e..7d5593459 100644 --- a/plugins/phonic/package.json +++ b/plugins/phonic/package.json @@ -41,7 +41,7 @@ "typescript": "^5.0.0" }, "dependencies": { - "phonic": "^0.31.8" + "phonic": "^0.31.10" }, "peerDependencies": { "@livekit/agents": "workspace:*", diff --git a/plugins/phonic/src/realtime/realtime_model.ts b/plugins/phonic/src/realtime/realtime_model.ts index 0233679e4..542749fb1 100644 --- a/plugins/phonic/src/realtime/realtime_model.ts +++ b/plugins/phonic/src/realtime/realtime_model.ts @@ -23,6 +23,10 @@ const PHONIC_INPUT_FRAME_MS = 20; const DEFAULT_MODEL = 'merritt'; const WS_CLOSE_NORMAL = 1000; const TOOL_CALL_OUTPUT_TIMEOUT_MS = 60_000; +const CONVERSATION_HISTORY_PREFIX = + '\n\nThis conversation is being continued from an existing ' + + 'conversation. You are the assistant speaking to the user. ' + + 'The following is the conversation history:\n'; export interface RealtimeModelOptions { apiKey: string; @@ -56,6 +60,10 @@ export class RealtimeModel extends llm.RealtimeModel { return this._options.model; } + get provider(): string { + return 'phonic'; + } + constructor( options: { /** @@ -146,6 +154,10 @@ export class RealtimeModel extends llm.RealtimeModel { autoToolReplyGeneration: true, manualFunctionCalls: false, audioOutput: true, + midSessionChatCtxUpdate: true, + midSessionInstructionsUpdate: true, + midSessionToolsUpdate: true, + perResponseToolChoice: false, }); const apiKey = options.apiKey || process.env.PHONIC_API_KEY; @@ -236,7 +248,9 @@ export class RealtimeSession extends llm.RealtimeSession { private connectTask: Promise; private toolDefinitions: Record[] = []; private pendingToolCallIds = new Set(); - private readyToStart = false; + private readyToStart = new Future(); + private pendingGenerateReplyFut?: Future; + private generateReplyRequestId = 0; private systemPromptPostfix = ''; constructor(realtimeModel: RealtimeModel) { @@ -294,9 +308,7 @@ export class RealtimeSession extends llm.RealtimeSession { this.logger.debug( 'updateChatCtx called with messages prior to config being sent to Phonic. Including conversation state in system instructions.', ); - this.systemPromptPostfix = - '\n\nThis conversation is being continued from an existing conversation. You are the assistant speaking to the user. The following is the conversation history:\n' + - turnHistory; + this.systemPromptPostfix = CONVERSATION_HISTORY_PREFIX + turnHistory; } this._chatCtx = chatCtx.copy(); } @@ -375,12 +387,69 @@ export class RealtimeSession extends llm.RealtimeSession { this.toolsReady.resolve(); } + async _updateSession( + instructions?: string, + chatCtx?: llm.ChatContext, + tools?: llm.ToolContext, + ): Promise { + await this.readyToStart.await; + if (instructions !== undefined) { + this.options.instructions = instructions; + } + if (tools !== undefined) { + this._tools = { ...tools }; + this.toolDefinitions = Object.entries(tools) + .filter(([, tool]) => llm.isFunctionTool(tool)) + .map(([name, tool]) => ({ + type: 'custom_websocket', + tool_schema: { + type: 'function', + function: { + name, + description: tool.description, + parameters: llm.toJsonSchema(tool.parameters), + strict: true, + }, + }, + tool_call_output_timeout_ms: TOOL_CALL_OUTPUT_TIMEOUT_MS, + wait_for_speech_before_tool_call: true, + allow_tool_chaining: false, + })); + } + if (chatCtx !== undefined) { + this._chatCtx = chatCtx.copy(); + } + + let systemPrompt = this.options.instructions ?? ''; + if (chatCtx !== undefined) { + const history = this.buildTurnHistory(chatCtx); + if (history) { + systemPrompt += CONVERSATION_HISTORY_PREFIX + history; + } + } + + this.closeCurrentGeneration({ interrupted: true }); + + const toolsPayload: Phonic.ConfigOptions.Tools.Item[] = [ + ...(this.options.phonicTools ?? []), + ...this.toolDefinitions, + ]; + + if (this.socket) { + this.logger.info('Sending mid-session reset to Phonic'); + this.socket.sendReset({ + type: 'reset', + config: this.buildConfigOptions({ systemPrompt, toolsPayload }), + }); + } + } + updateOptions(_options: { toolChoice?: llm.ToolChoice | null }): void { this.logger.warn('updateOptions is not supported by the Phonic realtime model.'); } pushAudio(frame: AudioFrame): void { - if (this.closed || !this.readyToStart) { + if (this.closed || !this.readyToStart.done) { return; } @@ -401,14 +470,39 @@ export class RealtimeSession extends llm.RealtimeSession { } async generateReply(instructions?: string): Promise { - if (this.socket) { - this.socket.sendGenerateReply({ type: 'generate_reply', system_message: instructions }); - } else { - this.logger.warn('Cannot send generate_reply: WebSocket not available'); + if (this.closed) { + return Promise.reject(new Error('session is closed')); + } + + if (this.pendingGenerateReplyFut && !this.pendingGenerateReplyFut.done) { + this.pendingGenerateReplyFut.reject( + new Error('generateReply superseded by a newer generateReply call'), + ); } + const requestId = ++this.generateReplyRequestId; this.closeCurrentGeneration({ interrupted: false }); - return this.startNewAssistantTurn({ userInitiated: true }); + this.pendingGenerateReplyFut = new Future(); + this.sendGenerateReply(instructions, requestId); + + return this.pendingGenerateReplyFut.await; + } + + private async sendGenerateReply( + instructions: string | undefined, + requestId: number, + ): Promise { + await this.readyToStart.await; + if (requestId !== this.generateReplyRequestId) { + return; + } + + if (this.closed || !this.socket) { + this.pendingGenerateReplyFut?.reject(new Error('session is closed')); + this.pendingGenerateReplyFut = undefined; + return; + } + this.socket.sendGenerateReply({ type: 'generate_reply', system_message: instructions }); } async commitAudio(): Promise { @@ -433,7 +527,10 @@ export class RealtimeSession extends llm.RealtimeSession { this.closedFuture.resolve(); this.instructionsReady.resolve(); this.toolsReady.resolve(); + this.readyToStart.resolve(); this.closeCurrentGeneration({ interrupted: false }); + this.rejectPendingGenerateReply(); + this.inputResampler = undefined; this.socket?.close(); await this.connectTask; await super.close(); @@ -455,6 +552,7 @@ export class RealtimeSession extends llm.RealtimeSession { this.socket.on('error', (error: Error) => this.emitError(error, false)); this.socket.on('close', (event: { code?: number }) => { this.closeCurrentGeneration({ interrupted: false }); + this.rejectPendingGenerateReply(); if (!this.closed && event.code !== WS_CLOSE_NORMAL) { this.emitError(new Error(`Phonic STS socket closed with code ${event.code ?? -1}`), false); } @@ -482,30 +580,10 @@ export class RealtimeSession extends llm.RealtimeSession { this.socket.sendConfig({ type: 'config', model: this.options.model as Phonic.ConfigPayload['model'], - agent: this.options.phonicAgent, - project: this.options.project, - welcome_message: this.options.welcomeMessage, - generate_welcome_message: this.options.generateWelcomeMessage, - system_prompt: this.options.instructions + this.systemPromptPostfix, - voice_id: this.options.voice, - input_format: 'pcm_44100', - output_format: 'pcm_44100', - ...(this.options.defaultLanguage !== undefined && { - default_language: this.options.defaultLanguage, - }), - ...(this.options.additionalLanguages !== undefined && { - additional_languages: this.options.additionalLanguages, + ...this.buildConfigOptions({ + systemPrompt: this.options.instructions + this.systemPromptPostfix, + toolsPayload: [...(this.options.phonicTools ?? []), ...this.toolDefinitions], }), - ...(this.options.multilingualMode !== undefined && { - multilingual_mode: this.options.multilingualMode, - }), - audio_speed: this.options.audioSpeed, - tools: [...(this.options.phonicTools ?? []), ...this.toolDefinitions], - boosted_keywords: this.options.boostedKeywords, - generate_no_input_poke_text: this.options.generateNoInputPokeText, - no_input_poke_sec: this.options.noInputPokeSec, - no_input_poke_text: this.options.noInputPokeText, - no_input_end_conversation_sec: this.options.noInputEndConversationSec, }); } @@ -557,7 +635,7 @@ export class RealtimeSession extends llm.RealtimeSession { this.handleToolCallInterrupted(message); break; case 'ready_to_start_conversation': - this.readyToStart = true; + this.readyToStart.resolve(); break; case 'assistant_chose_not_to_respond': case 'input_cancelled': @@ -698,6 +776,12 @@ export class RealtimeSession extends llm.RealtimeSession { responseId, }; + if (this.pendingGenerateReplyFut && !this.pendingGenerateReplyFut.done) { + generationEvent.userInitiated = true; + this.pendingGenerateReplyFut.resolve(generationEvent); + this.pendingGenerateReplyFut = undefined; + } + this.emit('generation_created', generationEvent); return generationEvent; } @@ -726,6 +810,13 @@ export class RealtimeSession extends llm.RealtimeSession { this.currentGeneration = undefined; } + private rejectPendingGenerateReply(): void { + if (this.pendingGenerateReplyFut && !this.pendingGenerateReplyFut.done) { + this.pendingGenerateReplyFut.reject(new Error('session is closed')); + this.pendingGenerateReplyFut = undefined; + } + } + private emitError(error: Error, recoverable: boolean): void { this.emit('error', { timestamp: Date.now(), @@ -736,9 +827,61 @@ export class RealtimeSession extends llm.RealtimeSession { } satisfies llm.RealtimeModelError); } + private buildConfigOptions({ + systemPrompt, + toolsPayload, + }: { + systemPrompt: string; + toolsPayload: Phonic.ConfigOptions.Tools.Item[]; + }): Phonic.ConfigOptions { + return { + agent: this.options.phonicAgent, + project: this.options.project, + welcome_message: this.options.welcomeMessage, + generate_welcome_message: this.options.generateWelcomeMessage, + system_prompt: systemPrompt, + voice_id: this.options.voice, + input_format: 'pcm_44100', + output_format: 'pcm_44100', + ...(this.options.defaultLanguage !== undefined && { + default_language: this.options.defaultLanguage, + }), + ...(this.options.additionalLanguages !== undefined && { + additional_languages: this.options.additionalLanguages, + }), + ...(this.options.multilingualMode !== undefined && { + multilingual_mode: this.options.multilingualMode, + }), + audio_speed: this.options.audioSpeed, + tools: toolsPayload, + boosted_keywords: this.options.boostedKeywords, + // ...(this.options.minWordsToInterrupt !== undefined && { + // min_words_to_interrupt: this.options.minWordsToInterrupt, + // }), + generate_no_input_poke_text: this.options.generateNoInputPokeText, + no_input_poke_sec: this.options.noInputPokeSec, + no_input_poke_text: this.options.noInputPokeText, + no_input_end_conversation_sec: this.options.noInputEndConversationSec, + }; + } + + private buildTurnHistory(chatCtx: llm.ChatContext): string | undefined { + const messages = chatCtx.items.filter( + (item): item is llm.ChatMessage => + item.type === 'message' && + 'textContent' in item && + item.textContent !== undefined && + item.textContent.trim() !== '', + ); + if (messages.length === 0) return undefined; + const history = messages.map((m) => `${m.role}: ${m.textContent}`).join('\n'); + return history.trim() || undefined; + } + private *resampleAudio(frame: AudioFrame): Generator { if (this.inputResampler) { if (frame.sampleRate !== this.inputResamplerInputRate) { + this.inputResampler.close(); this.inputResampler = undefined; this.inputResamplerInputRate = undefined; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 861ed9e85..4f81a174b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -7,8 +7,8 @@ settings: catalogs: default: '@livekit/rtc-node': - specifier: ^0.13.24 - version: 0.13.24 + specifier: ^0.13.25 + version: 0.13.25 patchedDependencies: '@changesets/assemble-release-plan': @@ -205,7 +205,7 @@ importers: devDependencies: '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.15.30) @@ -295,7 +295,7 @@ importers: version: 0.1.9 '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@opentelemetry/api': specifier: ^1.9.0 version: 1.9.0 @@ -369,7 +369,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -406,7 +406,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -434,7 +434,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -468,7 +468,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -502,7 +502,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -536,7 +536,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -576,7 +576,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -597,8 +597,8 @@ importers: specifier: workspace:* version: link:../../agents '@livekit/rtc-node': - specifier: ^0.13.22 - version: 0.13.24 + specifier: 'catalog:' + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -629,7 +629,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -654,7 +654,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -713,7 +713,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -753,7 +753,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -770,15 +770,15 @@ importers: plugins/phonic: dependencies: phonic: - specifier: ^0.31.8 - version: 0.31.8 + specifier: ^0.31.10 + version: 0.31.10 devDependencies: '@livekit/agents': specifier: workspace:* version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -806,7 +806,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -840,7 +840,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -874,7 +874,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -902,7 +902,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -933,7 +933,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@types/node': specifier: ^22.5.5 version: 22.15.30 @@ -958,7 +958,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -986,7 +986,7 @@ importers: version: link:../openai '@livekit/rtc-node': specifier: 'catalog:' - version: 0.13.24 + version: 0.13.25 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.19.1) @@ -2063,38 +2063,42 @@ packages: '@livekit/protocol@1.45.1': resolution: {integrity: sha512-sr6p0TwKofHO5KW6kUzjq4hH2de4Al5scQo824xFnyI1XYo0qQn6fTG+bdr+Uj4EedjYAOqjezwUju5OErVIRA==} - '@livekit/rtc-node-darwin-arm64@0.13.24': - resolution: {integrity: sha512-gm5xOpGu6Rj/mNU2jEijcGhQGN2GdxV2dNYQm3NCKN7ow0BmMFZvXSCAWOWf+9oTutPXHnrc7EN1mt2v+lfqhA==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-darwin-arm64@0.12.52-patch.0': + resolution: {integrity: sha512-IKUir6goV8yVRR7E2qrAP0JtH7gUyMkO0TG8G+dopO/fkXAsPpSealgI9fLcBJl0zhKK+eGCr741r6xR+xxsVw==} + engines: {node: '>= 18'} cpu: [arm64] os: [darwin] - '@livekit/rtc-node-darwin-x64@0.13.24': - resolution: {integrity: sha512-jZSK5lHDp7+u0jby7PEWMzbxc0F0nLx6FT3FVjuMlT13ZY6QWKDUUCFbfDOtbdhiOZJYc5A4SwvubY6woEJXTg==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-darwin-x64@0.12.52-patch.0': + resolution: {integrity: sha512-h2oKdGvK4E4nYxHc+hsHkYu+oJIhKKqrC96v1XSGa5fgIEcq4Bve6tNEwUCBTkvuGh/I2tOI83udgcF4P4+mhQ==} + engines: {node: '>= 18'} cpu: [x64] os: [darwin] - '@livekit/rtc-node-linux-arm64-gnu@0.13.24': - resolution: {integrity: sha512-I+IeZET2h+viZ48moEFF0EWDHa+kLii5yuEsw38ya4mHZaZtlfbzrYKGKdONqbI9M9ldvv8XXuD0wFPjuH5CZw==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-linux-arm64-gnu@0.12.52-patch.0': + resolution: {integrity: sha512-bWtJ3r+wQ1Fd8s8jiM7GnBMfaKepSYk5c4bgimMIC4mkz+puChpfaj9avz1M271FUgxnIAKCTz6fPN2ygIAFnw==} + engines: {node: '>= 18'} cpu: [arm64] os: [linux] - '@livekit/rtc-node-linux-x64-gnu@0.13.24': - resolution: {integrity: sha512-vKOxzN/SsrtV8zIVwZCi31bZUhlb6RhJZ0NnY5MwKGSRFPi7Dwt8fmr0Vh0YmsY/p+4eZjKxvFmy7L3WVE54zw==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-linux-x64-gnu@0.12.52-patch.0': + resolution: {integrity: sha512-y1j4ciiCMaUrii0/XYwLFyRBRHDvx4202YCK5ePF3xB+9tW3Fuwexd/z4GuupCpP9eadGkpALCQt60wnLnFDnw==} + engines: {node: '>= 18'} cpu: [x64] os: [linux] - '@livekit/rtc-node-win32-x64-msvc@0.13.24': - resolution: {integrity: sha512-yTzqwndq2oKLUkXW2i/BkZMJC6kZOpRO/DKvkkKQvqc3Q+JuWz1m48GmyjIwTOKF28QjqEU3+IrnD65Uu+mFOg==} - engines: {node: '>= 10'} + '@livekit/rtc-ffi-bindings-win32-x64-msvc@0.12.52-patch.0': + resolution: {integrity: sha512-a7eoTor7KgN4JDPqZjyBQjgkVIZcxkyP5Iau3O/1qDaYKboLMqSYHfSAk84Un4r0SsSFvxUXXDY3boMLJ7QYow==} + engines: {node: '>= 18'} cpu: [x64] os: [win32] - '@livekit/rtc-node@0.13.24': - resolution: {integrity: sha512-06pF8YJlJk11R6J7kFXFpwV8etpbmCskoXFvwfwcDDixMqaP6qtS5srq3G23mDaRjx7ofz/PXg2GtiZbqNGT5A==} + '@livekit/rtc-ffi-bindings@0.12.52-patch.0': + resolution: {integrity: sha512-e01PH3AAS0/oN93LzgLDycDWzLGCpHqvZ35qzSuBWrG7V9mmQpdW/bOc6r9UFGZx/BcUXov4OTtao4OyDVVyHw==} + engines: {node: '>= 18'} + + '@livekit/rtc-node@0.13.25': + resolution: {integrity: sha512-4tL58O2DdTDP+g1ajyP5mgEOzjymD/u06IxWWVKBee1goEwDSQlMqEog/DJW34FoNNqXp1yRMCsphI4V/T1ILg==} engines: {node: '>= 18'} '@livekit/throws-transformer@0.0.0-20260320165515': @@ -4482,8 +4486,8 @@ packages: resolution: {integrity: sha512-vE7JKRyES09KiunauX7nd2Q9/L7lhok4smP9RZTDeD4MVs72Dp2qNFVz39Nz5a0FVEW0BJR6C0DYrq6unoziZA==} engines: {node: '>= 14.16'} - phonic@0.31.8: - resolution: {integrity: sha512-BeUqRbr0Ta0uB+6OCB770BNJ/r77HvgF+5vE2c69HEylqO93IMQkeWVeXb56Vh8mT5D3LGoO6NRssG5ChtreMw==} + phonic@0.31.10: + resolution: {integrity: sha512-MMEbfgBnjdZ0j8dkRMCl3TQTpIiKtqaJY6U1DMJhl8F8diOVh92Q9XXbBeBCoRsrKhoHpfDxRrnKVunX6NlF1w==} engines: {node: '>=18.0.0'} picocolors@1.0.1: @@ -4965,7 +4969,7 @@ packages: tar@7.4.3: resolution: {integrity: sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw==} engines: {node: '>=18'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me term-size@2.2.1: resolution: {integrity: sha512-wK0Ri4fOGjv/XPy8SBHZChl8CM7uMc5VML7SqiQ0zG7+J5Vr+RMQDoHa2CNT6KHUnTGIXH34UDMkPzAUyapBZg==} @@ -6353,7 +6357,7 @@ snapshots: '@livekit/noise-cancellation-node@0.1.9': dependencies: - '@livekit/rtc-node': 0.13.24 + '@livekit/rtc-node': 0.13.25 tsx: 4.21.0 optionalDependencies: '@livekit/noise-cancellation-darwin-arm64': 0.1.9 @@ -6369,35 +6373,39 @@ snapshots: dependencies: '@bufbuild/protobuf': 1.10.1 - '@livekit/rtc-node-darwin-arm64@0.13.24': + '@livekit/rtc-ffi-bindings-darwin-arm64@0.12.52-patch.0': optional: true - '@livekit/rtc-node-darwin-x64@0.13.24': + '@livekit/rtc-ffi-bindings-darwin-x64@0.12.52-patch.0': optional: true - '@livekit/rtc-node-linux-arm64-gnu@0.13.24': + '@livekit/rtc-ffi-bindings-linux-arm64-gnu@0.12.52-patch.0': optional: true - '@livekit/rtc-node-linux-x64-gnu@0.13.24': + '@livekit/rtc-ffi-bindings-linux-x64-gnu@0.12.52-patch.0': optional: true - '@livekit/rtc-node-win32-x64-msvc@0.13.24': + '@livekit/rtc-ffi-bindings-win32-x64-msvc@0.12.52-patch.0': optional: true - '@livekit/rtc-node@0.13.24': + '@livekit/rtc-ffi-bindings@0.12.52-patch.0': dependencies: '@bufbuild/protobuf': 1.10.1 + optionalDependencies: + '@livekit/rtc-ffi-bindings-darwin-arm64': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-darwin-x64': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-linux-arm64-gnu': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-linux-x64-gnu': 0.12.52-patch.0 + '@livekit/rtc-ffi-bindings-win32-x64-msvc': 0.12.52-patch.0 + + '@livekit/rtc-node@0.13.25': + dependencies: '@datastructures-js/deque': 1.0.8 '@livekit/mutex': 1.1.1 + '@livekit/rtc-ffi-bindings': 0.12.52-patch.0 '@livekit/typed-emitter': 3.0.0 pino: 9.6.0 pino-pretty: 13.0.0 - optionalDependencies: - '@livekit/rtc-node-darwin-arm64': 0.13.24 - '@livekit/rtc-node-darwin-x64': 0.13.24 - '@livekit/rtc-node-linux-arm64-gnu': 0.13.24 - '@livekit/rtc-node-linux-x64-gnu': 0.13.24 - '@livekit/rtc-node-win32-x64-msvc': 0.13.24 '@livekit/throws-transformer@0.0.0-20260320165515(typescript@5.4.5)': dependencies: @@ -9121,7 +9129,7 @@ snapshots: pathval@2.0.0: {} - phonic@0.31.8: + phonic@0.31.10: dependencies: ws: 8.20.0 transitivePeerDependencies: diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index d073c55c8..a396d66df 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -8,4 +8,4 @@ minimumReleaseAgeExclude: - '@livekit/*' catalog: - '@livekit/rtc-node': ^0.13.24 + '@livekit/rtc-node': ^0.13.25