Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/happy-yaks-bet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents-plugin-phonic": patch
---

Update phonic plugin to reuse session for handoffs
5 changes: 5 additions & 0 deletions .changeset/plenty-baths-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

remove rt session say logic and add phonic logic for resetting ws conn
9 changes: 9 additions & 0 deletions .changeset/sharp-apples-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@livekit/agents": patch
"@livekit/agents-plugin-google": patch
"@livekit/agents-plugin-openai": patch
"@livekit/agents-plugin-phonic": patch
---

- Make reusable Realtime Session across Handoffs & Agent Tasks
- Add say() capability to phonic realtime model
4 changes: 4 additions & 0 deletions agents/src/llm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export {
oaiParams,
serializeImage,
toJsonSchema,
validateChatContextStructure,
type ChatContextValidationIssue,
type ChatContextValidationResult,
type ChatContextValidationSeverity,
type FormatChatHistoryOptions,
type OpenAIFunctionParameters,
type SerializedImage,
Expand Down
17 changes: 13 additions & 4 deletions agents/src/llm/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import type { AudioFrame } from '@livekit/rtc-node';
import { EventEmitter } from 'events';
import type { ReadableStream } from 'node:stream/web';
import { DeferredReadableStream } from '../stream/deferred_stream.js';
import { MultiInputStream } from '../stream/multi_input_stream.js';
import { Task } from '../utils.js';
import type { TimedString } from '../voice/io.js';
import type { ChatContext, FunctionCall } from './chat_context.js';
Expand Down Expand Up @@ -49,6 +49,10 @@ export interface RealtimeCapabilities {
autoToolReplyGeneration: boolean;
audioOutput: boolean;
manualFunctionCalls: boolean;
midSessionChatCtxUpdate?: boolean;
midSessionInstructionsUpdate?: boolean;
midSessionToolsUpdate?: boolean;
perResponseToolChoice?: boolean;
}

export interface InputTranscriptionCompleted {
Expand Down Expand Up @@ -84,7 +88,8 @@ export abstract class RealtimeModel {

export abstract class RealtimeSession extends EventEmitter {
protected _realtimeModel: RealtimeModel;
private deferredInputStream = new DeferredReadableStream<AudioFrame>();
private inputAudioStream = new MultiInputStream<AudioFrame>();
private inputAudioStreamId?: string;
private _mainTask: Task<void>;

constructor(realtimeModel: RealtimeModel) {
Expand Down Expand Up @@ -146,6 +151,7 @@ export abstract class RealtimeSession extends EventEmitter {

async close(): Promise<void> {
this._mainTask.cancel();
await this.inputAudioStream.close();
}

/**
Expand All @@ -156,7 +162,7 @@ export abstract class RealtimeSession extends EventEmitter {
}

private async _mainTaskImpl(signal: AbortSignal): Promise<void> {
const reader = this.deferredInputStream.stream.getReader();
const reader = this.inputAudioStream.stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done || signal.aborted) {
Expand All @@ -167,6 +173,9 @@ export abstract class RealtimeSession extends EventEmitter {
}

setInputAudioStream(audioStream: ReadableStream<AudioFrame>): void {
this.deferredInputStream.setSource(audioStream);
if (this.inputAudioStreamId !== undefined) {
void this.inputAudioStream.removeInputStream(this.inputAudioStreamId);
}
this.inputAudioStreamId = this.inputAudioStream.addInputStream(audioStream);
}
}
88 changes: 87 additions & 1 deletion agents/src/llm/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import {
FunctionCallOutput,
type ImageContent,
} from './chat_context.js';
import { computeChatCtxDiff, formatChatHistory, serializeImage } from './utils.js';
import {
computeChatCtxDiff,
formatChatHistory,
serializeImage,
validateChatContextStructure,
} from './utils.js';

function createChatMessage(
id: string,
Expand Down Expand Up @@ -457,6 +462,87 @@ describe('formatChatHistory', () => {
});
});

describe('validateChatContextStructure', () => {
it('returns valid=true for well-formed chat context', () => {
const ctx = new ChatContext([
ChatMessage.create({
id: 'msg_user',
role: 'user',
content: ['hello'],
createdAt: 1,
}),
FunctionCall.create({
id: 'fn_call',
callId: 'call_1',
name: 'lookup_order',
args: '{"orderId":"123"}',
createdAt: 2,
}),
FunctionCallOutput.create({
id: 'fn_output',
callId: 'call_1',
name: 'lookup_order',
output: '{"ok":true}',
isError: false,
createdAt: 3,
}),
]);

const result = validateChatContextStructure(ctx);
expect(result.valid).toBe(true);
expect(result.errors).toBe(0);
expect(result.warnings).toBe(0);
expect(result.issues).toEqual([]);
});

it('detects duplicate ids and timestamp ordering issues', () => {
const m1 = ChatMessage.create({
id: 'dup_id',
role: 'user',
content: ['hello'],
createdAt: 10,
});
const m2 = ChatMessage.create({
id: 'dup_id',
role: 'assistant',
content: ['world'],
createdAt: 5,
});
const ctx = new ChatContext([m1, m2]);

const result = validateChatContextStructure(ctx);
expect(result.valid).toBe(false);
expect(result.errors).toBeGreaterThanOrEqual(2);
expect(result.issues.some((i) => i.code === 'duplicate_id')).toBe(true);
expect(result.issues.some((i) => i.code === 'timestamp_order')).toBe(true);
});

it('detects malformed terms and orphan function outputs', () => {
const msg = ChatMessage.create({
id: 'msg_1',
role: 'user',
content: [' '],
createdAt: 1,
});
const output = FunctionCallOutput.create({
id: 'fn_out_1',
callId: 'call_missing',
name: 'lookup_order',
output: 'ok',
isError: false,
createdAt: 2,
});
const ctx = new ChatContext([msg, output]);

const result = validateChatContextStructure(ctx);
expect(result.valid).toBe(true);
expect(result.errors).toBe(0);
expect(result.warnings).toBeGreaterThanOrEqual(2);
expect(result.issues.some((i) => i.code === 'empty_text_term')).toBe(true);
expect(result.issues.some((i) => i.code === 'orphan_function_call_output')).toBe(true);
});
});

describe('serializeImage', () => {
let consoleWarnSpy: ReturnType<typeof vi.spyOn>;

Expand Down
169 changes: 169 additions & 0 deletions agents/src/llm/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,33 @@ export interface FormatChatHistoryOptions {
includeTimestamps?: boolean;
}

export type ChatContextValidationSeverity = 'error' | 'warning';

export interface ChatContextValidationIssue {
severity: ChatContextValidationSeverity;
code:
| 'duplicate_id'
| 'timestamp_order'
| 'empty_message_content'
| 'empty_text_term'
| 'missing_image_term'
| 'invalid_audio_term'
| 'invalid_function_call'
| 'invalid_function_call_args'
| 'invalid_function_call_output'
| 'orphan_function_call_output';
index: number;
itemId: string;
message: string;
}

export interface ChatContextValidationResult {
valid: boolean;
errors: number;
warnings: number;
issues: ChatContextValidationIssue[];
}

/**
* Render a chat context into a readable multiline string for debugging and logging.
*/
Expand All @@ -273,6 +300,148 @@ export function formatChatHistory(
].join('\n');
}

/**
* Validate structural integrity of chat context items/terms for realtime usage.
*/
export function validateChatContextStructure(chatCtx: ChatContext): ChatContextValidationResult {
const issues: ChatContextValidationIssue[] = [];
const ids = new Set<string>();
const seenFunctionCallIds = new Set<string>();
let previousCreatedAt = -Infinity;

const pushIssue = (issue: ChatContextValidationIssue) => {
issues.push(issue);
};

for (let index = 0; index < chatCtx.items.length; index += 1) {
const item = chatCtx.items[index]!;

if (ids.has(item.id)) {
pushIssue({
severity: 'error',
code: 'duplicate_id',
index,
itemId: item.id,
message: `Duplicate item id '${item.id}'`,
});
} else {
ids.add(item.id);
}

if (item.createdAt < previousCreatedAt) {
pushIssue({
severity: 'error',
code: 'timestamp_order',
index,
itemId: item.id,
message: `Item createdAt (${item.createdAt}) is older than previous item (${previousCreatedAt})`,
});
}
previousCreatedAt = item.createdAt;

if (item.type === 'message') {
if (item.content.length === 0) {
pushIssue({
severity: 'warning',
code: 'empty_message_content',
index,
itemId: item.id,
message: 'Message has empty content array',
});
}

item.content.forEach((term, termIndex) => {
if (typeof term === 'string') {
if (term.trim().length === 0) {
pushIssue({
severity: 'warning',
code: 'empty_text_term',
index,
itemId: item.id,
message: `Message term[${termIndex}] is empty text`,
});
}
return;
}

if (term.type === 'image_content') {
if (!term.id || term.image === undefined || term.image === null) {
pushIssue({
severity: 'error',
code: 'missing_image_term',
index,
itemId: item.id,
message: `Message term[${termIndex}] has invalid image content`,
});
}
return;
}

if (!Array.isArray(term.frame)) {
pushIssue({
severity: 'error',
code: 'invalid_audio_term',
index,
itemId: item.id,
message: `Message term[${termIndex}] has invalid audio content`,
});
}
});
} else if (item.type === 'function_call') {
if (!item.name || !item.callId) {
pushIssue({
severity: 'error',
code: 'invalid_function_call',
index,
itemId: item.id,
message: 'Function call is missing name or callId',
});
} else {
seenFunctionCallIds.add(item.callId);
}

try {
JSON.parse(item.args);
} catch {
pushIssue({
severity: 'warning',
code: 'invalid_function_call_args',
index,
itemId: item.id,
message: 'Function call args are not valid JSON',
});
}
} else if (item.type === 'function_call_output') {
if (!item.callId) {
pushIssue({
severity: 'error',
code: 'invalid_function_call_output',
index,
itemId: item.id,
message: 'Function call output is missing callId',
});
} else if (!seenFunctionCallIds.has(item.callId)) {
pushIssue({
severity: 'warning',
code: 'orphan_function_call_output',
index,
itemId: item.id,
message: `Function call output references unknown callId '${item.callId}'`,
});
}
}
}

const errors = issues.filter((issue) => issue.severity === 'error').length;
const warnings = issues.length - errors;
return {
valid: errors === 0,
errors,
warnings,
issues,
};
}

function formatChatHistoryItem(
item: ChatItem,
index: number,
Expand Down
Loading