Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/happy-yaks-bet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents-plugin-phonic": patch
---

Update phonic plugin to reuse session for handoffs
5 changes: 5 additions & 0 deletions .changeset/plenty-baths-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents': patch
---

remove rt session say logic and add phonic logic for resetting ws conn
9 changes: 9 additions & 0 deletions .changeset/sharp-apples-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@livekit/agents": patch
"@livekit/agents-plugin-google": patch
"@livekit/agents-plugin-openai": patch
"@livekit/agents-plugin-phonic": patch
---

- Make reusable Realtime Session across Handoffs & Agent Tasks
- Add say() capability to phonic realtime model
4 changes: 4 additions & 0 deletions agents/src/llm/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ export {
oaiParams,
serializeImage,
toJsonSchema,
validateChatContextStructure,
type ChatContextValidationIssue,
type ChatContextValidationResult,
type ChatContextValidationSeverity,
type FormatChatHistoryOptions,
type OpenAIFunctionParameters,
type SerializedImage,
Expand Down
17 changes: 13 additions & 4 deletions agents/src/llm/realtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import type { AudioFrame } from '@livekit/rtc-node';
import { EventEmitter } from 'events';
import type { ReadableStream } from 'node:stream/web';
import { DeferredReadableStream } from '../stream/deferred_stream.js';
import { MultiInputStream } from '../stream/multi_input_stream.js';
import { Task } from '../utils.js';
import type { TimedString } from '../voice/io.js';
import type { ChatContext, FunctionCall } from './chat_context.js';
Expand Down Expand Up @@ -49,6 +49,10 @@ export interface RealtimeCapabilities {
autoToolReplyGeneration: boolean;
audioOutput: boolean;
manualFunctionCalls: boolean;
midSessionChatCtxUpdate?: boolean;
midSessionInstructionsUpdate?: boolean;
midSessionToolsUpdate?: boolean;
perResponseToolChoice?: boolean;
}

export interface InputTranscriptionCompleted {
Expand Down Expand Up @@ -84,7 +88,8 @@ export abstract class RealtimeModel {

export abstract class RealtimeSession extends EventEmitter {
protected _realtimeModel: RealtimeModel;
private deferredInputStream = new DeferredReadableStream<AudioFrame>();
private inputAudioStream = new MultiInputStream<AudioFrame>();
private inputAudioStreamId?: string;
private _mainTask: Task<void>;

constructor(realtimeModel: RealtimeModel) {
Expand Down Expand Up @@ -146,6 +151,7 @@ export abstract class RealtimeSession extends EventEmitter {

async close(): Promise<void> {
this._mainTask.cancel();
await this.inputAudioStream.close();
}

/**
Expand All @@ -156,7 +162,7 @@ export abstract class RealtimeSession extends EventEmitter {
}

private async _mainTaskImpl(signal: AbortSignal): Promise<void> {
const reader = this.deferredInputStream.stream.getReader();
const reader = this.inputAudioStream.stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done || signal.aborted) {
Expand All @@ -167,6 +173,9 @@ export abstract class RealtimeSession extends EventEmitter {
}

setInputAudioStream(audioStream: ReadableStream<AudioFrame>): void {
this.deferredInputStream.setSource(audioStream);
if (this.inputAudioStreamId !== undefined) {
void this.inputAudioStream.removeInputStream(this.inputAudioStreamId);
}
this.inputAudioStreamId = this.inputAudioStream.addInputStream(audioStream);
}
}
88 changes: 87 additions & 1 deletion agents/src/llm/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ import {
FunctionCallOutput,
type ImageContent,
} from './chat_context.js';
import { computeChatCtxDiff, formatChatHistory, serializeImage } from './utils.js';
import {
computeChatCtxDiff,
formatChatHistory,
serializeImage,
validateChatContextStructure,
} from './utils.js';

function createChatMessage(
id: string,
Expand Down Expand Up @@ -457,6 +462,87 @@ describe('formatChatHistory', () => {
});
});

describe('validateChatContextStructure', () => {
it('returns valid=true for well-formed chat context', () => {
const ctx = new ChatContext([
ChatMessage.create({
id: 'msg_user',
role: 'user',
content: ['hello'],
createdAt: 1,
}),
FunctionCall.create({
id: 'fn_call',
callId: 'call_1',
name: 'lookup_order',
args: '{"orderId":"123"}',
createdAt: 2,
}),
FunctionCallOutput.create({
id: 'fn_output',
callId: 'call_1',
name: 'lookup_order',
output: '{"ok":true}',
isError: false,
createdAt: 3,
}),
]);

const result = validateChatContextStructure(ctx);
expect(result.valid).toBe(true);
expect(result.errors).toBe(0);
expect(result.warnings).toBe(0);
expect(result.issues).toEqual([]);
});

it('detects duplicate ids and timestamp ordering issues', () => {
const m1 = ChatMessage.create({
id: 'dup_id',
role: 'user',
content: ['hello'],
createdAt: 10,
});
const m2 = ChatMessage.create({
id: 'dup_id',
role: 'assistant',
content: ['world'],
createdAt: 5,
});
const ctx = new ChatContext([m1, m2]);

const result = validateChatContextStructure(ctx);
expect(result.valid).toBe(false);
expect(result.errors).toBeGreaterThanOrEqual(2);
expect(result.issues.some((i) => i.code === 'duplicate_id')).toBe(true);
expect(result.issues.some((i) => i.code === 'timestamp_order')).toBe(true);
});

it('detects malformed terms and orphan function outputs', () => {
const msg = ChatMessage.create({
id: 'msg_1',
role: 'user',
content: [' '],
createdAt: 1,
});
const output = FunctionCallOutput.create({
id: 'fn_out_1',
callId: 'call_missing',
name: 'lookup_order',
output: 'ok',
isError: false,
createdAt: 2,
});
const ctx = new ChatContext([msg, output]);

const result = validateChatContextStructure(ctx);
expect(result.valid).toBe(true);
expect(result.errors).toBe(0);
expect(result.warnings).toBeGreaterThanOrEqual(2);
expect(result.issues.some((i) => i.code === 'empty_text_term')).toBe(true);
expect(result.issues.some((i) => i.code === 'orphan_function_call_output')).toBe(true);
});
});

describe('serializeImage', () => {
let consoleWarnSpy: ReturnType<typeof vi.spyOn>;

Expand Down
169 changes: 169 additions & 0 deletions agents/src/llm/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,33 @@ export interface FormatChatHistoryOptions {
includeTimestamps?: boolean;
}

export type ChatContextValidationSeverity = 'error' | 'warning';

export interface ChatContextValidationIssue {
severity: ChatContextValidationSeverity;
code:
| 'duplicate_id'
| 'timestamp_order'
| 'empty_message_content'
| 'empty_text_term'
| 'missing_image_term'
| 'invalid_audio_term'
| 'invalid_function_call'
| 'invalid_function_call_args'
| 'invalid_function_call_output'
| 'orphan_function_call_output';
index: number;
itemId: string;
message: string;
}

export interface ChatContextValidationResult {
valid: boolean;
errors: number;
warnings: number;
issues: ChatContextValidationIssue[];
}

/**
* Render a chat context into a readable multiline string for debugging and logging.
*/
Expand All @@ -273,6 +300,148 @@ export function formatChatHistory(
].join('\n');
}

/**
* Validate structural integrity of chat context items/terms for realtime usage.
*/
export function validateChatContextStructure(chatCtx: ChatContext): ChatContextValidationResult {
const issues: ChatContextValidationIssue[] = [];
const ids = new Set<string>();
const seenFunctionCallIds = new Set<string>();
let previousCreatedAt = -Infinity;

const pushIssue = (issue: ChatContextValidationIssue) => {
issues.push(issue);
};

for (let index = 0; index < chatCtx.items.length; index += 1) {
const item = chatCtx.items[index]!;

if (ids.has(item.id)) {
pushIssue({
severity: 'error',
code: 'duplicate_id',
index,
itemId: item.id,
message: `Duplicate item id '${item.id}'`,
});
} else {
ids.add(item.id);
}

if (item.createdAt < previousCreatedAt) {
pushIssue({
severity: 'error',
code: 'timestamp_order',
index,
itemId: item.id,
message: `Item createdAt (${item.createdAt}) is older than previous item (${previousCreatedAt})`,
});
}
previousCreatedAt = item.createdAt;

if (item.type === 'message') {
if (item.content.length === 0) {
pushIssue({
severity: 'warning',
code: 'empty_message_content',
index,
itemId: item.id,
message: 'Message has empty content array',
});
}

item.content.forEach((term, termIndex) => {
if (typeof term === 'string') {
if (term.trim().length === 0) {
pushIssue({
severity: 'warning',
code: 'empty_text_term',
index,
itemId: item.id,
message: `Message term[${termIndex}] is empty text`,
});
}
return;
}

if (term.type === 'image_content') {
if (!term.id || term.image === undefined || term.image === null) {
pushIssue({
severity: 'error',
code: 'missing_image_term',
index,
itemId: item.id,
message: `Message term[${termIndex}] has invalid image content`,
});
}
return;
}

if (!Array.isArray(term.frame)) {
pushIssue({
severity: 'error',
code: 'invalid_audio_term',
index,
itemId: item.id,
message: `Message term[${termIndex}] has invalid audio content`,
});
}
});
} else if (item.type === 'function_call') {
if (!item.name || !item.callId) {
pushIssue({
severity: 'error',
code: 'invalid_function_call',
index,
itemId: item.id,
message: 'Function call is missing name or callId',
});
} else {
seenFunctionCallIds.add(item.callId);
}

try {
JSON.parse(item.args);
} catch {
pushIssue({
severity: 'warning',
code: 'invalid_function_call_args',
index,
itemId: item.id,
message: 'Function call args are not valid JSON',
});
}
} else if (item.type === 'function_call_output') {
if (!item.callId) {
pushIssue({
severity: 'error',
code: 'invalid_function_call_output',
index,
itemId: item.id,
message: 'Function call output is missing callId',
});
} else if (!seenFunctionCallIds.has(item.callId)) {
pushIssue({
severity: 'warning',
code: 'orphan_function_call_output',
index,
itemId: item.id,
message: `Function call output references unknown callId '${item.callId}'`,
});
}
}
}

const errors = issues.filter((issue) => issue.severity === 'error').length;
const warnings = issues.length - errors;
return {
valid: errors === 0,
errors,
warnings,
issues,
};
}

function formatChatHistoryItem(
item: ChatItem,
index: number,
Expand Down
Loading