Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tasty-pillows-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Reuse STT Pipeline Across Agent Handoff
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,4 @@ examples/src/test_*.ts
# OpenTelemetry trace test output
.traces/
*.traces.json
.worktrees/
35 changes: 35 additions & 0 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,41 @@ export async function waitForTrackPublication({
}
}

/**
* Yields values from a ReadableStream until the stream ends or the signal is aborted.
* Handles reader cleanup and stream-release errors internally.
*/
export async function* readStream<T>(
stream: ReadableStream<T>,
signal?: AbortSignal,
): AsyncGenerator<T> {
const reader = stream.getReader();
try {
if (signal) {
const abortPromise = waitForAbort(signal);
while (true) {
const result = await Promise.race([reader.read(), abortPromise]);
if (!result) break;
const { done, value } = result;
if (done) break;
yield value;
}
} else {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
}
} finally {
try {
reader.releaseLock();
} catch {
// stream cleanup errors are expected (releasing reader, controller closed, etc.)
}
}
}

export async function waitForAbort(signal: AbortSignal) {
const abortFuture = new Future<void>();
const handler = () => {
Expand Down
54 changes: 47 additions & 7 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
type EndOfTurnInfo,
type PreemptiveGenerationInfo,
type RecognitionHooks,
type STTPipeline,
} from './audio_recognition.js';
import {
AgentSessionEventTypes,
Expand Down Expand Up @@ -292,19 +293,27 @@ export class AgentActivity implements RecognitionHooks {
this.isDefaultInterruptionByAudioActivityEnabled = this.isInterruptionByAudioActivityEnabled;
}

async start(): Promise<void> {
async start(options?: { reuseSttPipeline?: STTPipeline }): Promise<void> {
const unlock = await this.lock.lock();
try {
await this._startSession({ spanName: 'start_agent_activity', runOnEnter: true });
await this._startSession({
spanName: 'start_agent_activity',
runOnEnter: true,
reuseSttPipeline: options?.reuseSttPipeline,
});
} finally {
unlock();
}
}

async resume(): Promise<void> {
async resume(options?: { reuseSttPipeline?: STTPipeline }): Promise<void> {
const unlock = await this.lock.lock();
try {
await this._startSession({ spanName: 'resume_agent_activity', runOnEnter: false });
await this._startSession({
spanName: 'resume_agent_activity',
runOnEnter: false,
reuseSttPipeline: options?.reuseSttPipeline,
});
} finally {
unlock();
}
Expand All @@ -313,8 +322,9 @@ export class AgentActivity implements RecognitionHooks {
private async _startSession(options: {
spanName: 'start_agent_activity' | 'resume_agent_activity';
runOnEnter: boolean;
reuseSttPipeline?: STTPipeline;
}): Promise<void> {
const { spanName, runOnEnter } = options;
const { spanName, runOnEnter, reuseSttPipeline } = options;
const startSpan = tracer.startSpan({
name: spanName,
attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id },
Expand Down Expand Up @@ -415,9 +425,15 @@ export class AgentActivity implements RecognitionHooks {
sttProvider: this.getSttProvider(),
getLinkedParticipant: () => this.agentSession._roomIO?.linkedParticipant,
});
this.audioRecognition.start();
this.started = true;

if (reuseSttPipeline) {
this.logger.debug('Reusing STT pipeline from previous activity');
await this.audioRecognition.start({ sttPipeline: reuseSttPipeline });
} else {
await this.audioRecognition.start();
}

this.started = true;
this._resumeSchedulingTask();

if (runOnEnter) {
Expand All @@ -438,6 +454,30 @@ export class AgentActivity implements RecognitionHooks {
startSpan.end();
}

async _detachSttPipelineIfReusable(newActivity: AgentActivity): Promise<STTPipeline | undefined> {
const hasAudioRecognition = !!this.audioRecognition;
const hasSttOld = !!this.stt;
const hasSttNew = !!newActivity.stt;
const sameSttInstance = this.stt === newActivity.stt;
const sameSttNode =
Object.getPrototypeOf(this.agent).sttNode ===
Object.getPrototypeOf(newActivity.agent).sttNode;

if (!hasAudioRecognition || !hasSttOld || !hasSttNew) {
return undefined;
}

if (!sameSttInstance) {
return undefined;
}

if (!sameSttNode) {
return undefined;
}

return await this.audioRecognition!.detachSttPipeline();
}

get currentSpeech(): SpeechHandle | undefined {
return this._currentSpeech;
}
Expand Down
123 changes: 123 additions & 0 deletions agents/src/voice/agent_activity_handoff.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { describe, expect, it, vi } from 'vitest';
import { type SpeechEvent } from '../stt/stt.js';
import { Agent } from './agent.js';
import { AgentActivity } from './agent_activity.js';

type FakeActivity = {
agent: Agent;
audioRecognition: { detachSttPipeline: ReturnType<typeof vi.fn> } | undefined;
stt: unknown;
};

function createFakeActivity(agent: Agent, stt: unknown) {
const detachedPipeline = { id: Symbol('pipeline') };
const activity = {
agent,
audioRecognition: {
detachSttPipeline: vi.fn(async () => detachedPipeline),
},
stt,
} as FakeActivity;

return { activity, detachedPipeline };
}

async function detachIfReusable(oldActivity: FakeActivity, newActivity: FakeActivity) {
return await (AgentActivity.prototype as any)._detachSttPipelineIfReusable.call(
oldActivity,
newActivity,
);
}

describe('AgentActivity STT handoff reuse eligibility', () => {
it('reuses the pipeline when both activities share the same STT instance and sttNode', async () => {
const sharedStt = { id: 'shared-stt' };
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBe(oldActivity.detachedPipeline);
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1);
});

it('does not reuse when the STT instances differ', async () => {
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), { id: 'stt-a' });
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), { id: 'stt-b' });

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled();
});

it('does not reuse when either activity has no STT', async () => {
const sharedStt = { id: 'shared-stt' };
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), undefined);
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled();
});

it('does not reuse when the agents override sttNode differently', async () => {
const sharedStt = { id: 'shared-stt' };

class AgentA extends Agent {
async sttNode(_audio: ReadableStream<AudioFrame>, _modelSettings: any) {
return null as ReadableStream<SpeechEvent | string> | null;
}
}

class AgentB extends Agent {
async sttNode(_audio: ReadableStream<AudioFrame>, _modelSettings: any) {
return null as ReadableStream<SpeechEvent | string> | null;
}
}

const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled();
});

it('reuses when the new agent inherits the same sttNode implementation', async () => {
const sharedStt = { id: 'shared-stt' };

class AgentA extends Agent {
async sttNode(_audio: ReadableStream<AudioFrame>, _modelSettings: any) {
return null as ReadableStream<SpeechEvent | string> | null;
}
}

class AgentB extends AgentA {}

const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBe(oldActivity.detachedPipeline);
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1);
});

it('does not reuse when the old activity has no audioRecognition', async () => {
const sharedStt = { id: 'shared-stt' };
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt);
oldActivity.activity.audioRecognition = undefined;

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
});
});
32 changes: 29 additions & 3 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { Task } from '../utils.js';
import type { VAD } from '../vad.js';
import type { Agent } from './agent.js';
import { AgentActivity } from './agent_activity.js';
import type { _TurnDetector } from './audio_recognition.js';
import type { STTPipeline, _TurnDetector } from './audio_recognition.js';
import {
type AgentEvent,
AgentSessionEventTypes,
Expand Down Expand Up @@ -223,6 +223,7 @@ export class AgentSession<
private _input: AgentInput;
private _output: AgentOutput;

private closing = false;
private closingTask: Promise<void> | null = null;
private userAwayTimer: NodeJS.Timeout | null = null;

Expand Down Expand Up @@ -515,6 +516,7 @@ export class AgentSession<
return;
}

this.closing = false;
this._usageCollector = new ModelUsageCollector();

let ctx: JobContext | undefined = undefined;
Expand Down Expand Up @@ -760,6 +762,7 @@ export class AgentSession<
const runWithContext = async () => {
const unlock = await this.activityLock.lock();
let onEnterTask: Task<void> | undefined;
let reusedSttPipeline: STTPipeline | undefined;

try {
this.agent = agent;
Expand All @@ -782,6 +785,10 @@ export class AgentSession<
this.nextActivity = agent._agentActivity;
}

if (prevActivityObj && this.nextActivity && prevActivityObj !== this.nextActivity) {
reusedSttPipeline = await prevActivityObj._detachSttPipelineIfReusable(this.nextActivity);
}

if (prevActivityObj && prevActivityObj !== this.nextActivity) {
if (previousActivity === 'pause') {
await prevActivityObj.pause({ blockedTasks });
Expand All @@ -791,6 +798,18 @@ export class AgentSession<
}
}

if (this.closing && newActivity === 'start') {
this.logger.warn(
{ agentId: this.nextActivity?.agent.id },
'Session is closing, skipping start of next activity',
);
await reusedSttPipeline?.close();
reusedSttPipeline = undefined;
this.nextActivity = undefined;
this.activity = undefined;
return;
}

this.activity = this.nextActivity;
this.nextActivity = undefined;

Expand All @@ -815,16 +834,22 @@ export class AgentSession<
);

if (newActivity === 'start') {
await this.activity!.start();
await this.activity!.start({ reuseSttPipeline: reusedSttPipeline });
} else {
await this.activity!.resume();
await this.activity!.resume({ reuseSttPipeline: reusedSttPipeline });
}
reusedSttPipeline = undefined;

onEnterTask = this.activity!._onEnterTask;

if (this._input.audio) {
this.activity!.attachAudioInput(this._input.audio.stream);
}
} catch (error) {
// JS safeguard: session cleanup owns the detached pipeline until the next activity
// starts successfully, preventing leaks when handoff fails mid-transition.
await reusedSttPipeline?.close();
throw error;
} finally {
unlock();
}
Expand Down Expand Up @@ -1130,6 +1155,7 @@ export class AgentSession<
return;
}

this.closing = true;
this._cancelUserAwayTimer();
this._onAecWarmupExpired();
this.off(AgentSessionEventTypes.UserInputTranscribed, this._onUserInputTranscribed);
Expand Down
Loading
Loading