Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tasty-pillows-serve.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Reuse STT Pipeline Across Agent Handoff
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,4 @@ examples/src/test_*.ts
# OpenTelemetry trace test output
.traces/
*.traces.json
.worktrees/
35 changes: 35 additions & 0 deletions agents/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,41 @@ export async function waitForTrackPublication({
}
}

/**
* Yields values from a ReadableStream until the stream ends or the signal is aborted.
* Handles reader cleanup and stream-release errors internally.
*/
export async function* readStream<T>(
stream: ReadableStream<T>,
signal?: AbortSignal,
): AsyncGenerator<T> {
const reader = stream.getReader();
try {
if (signal) {
const abortPromise = waitForAbort(signal);
while (true) {
const result = await Promise.race([reader.read(), abortPromise]);
if (!result) break;
const { done, value } = result;
if (done) break;
yield value;
}
} else {
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
}
} finally {
try {
reader.releaseLock();
} catch {
// stream cleanup errors are expected (releasing reader, controller closed, etc.)
}
}
}

export async function waitForAbort(signal: AbortSignal) {
const abortFuture = new Future<void>();
const handler = () => {
Expand Down
54 changes: 47 additions & 7 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
type EndOfTurnInfo,
type PreemptiveGenerationInfo,
type RecognitionHooks,
type STTPipeline,
} from './audio_recognition.js';
import {
AgentSessionEventTypes,
Expand Down Expand Up @@ -292,19 +293,27 @@ export class AgentActivity implements RecognitionHooks {
this.isDefaultInterruptionByAudioActivityEnabled = this.isInterruptionByAudioActivityEnabled;
}

async start(): Promise<void> {
async start(options?: { reuseSttPipeline?: STTPipeline }): Promise<void> {
const unlock = await this.lock.lock();
try {
await this._startSession({ spanName: 'start_agent_activity', runOnEnter: true });
await this._startSession({
spanName: 'start_agent_activity',
runOnEnter: true,
reuseSttPipeline: options?.reuseSttPipeline,
});
} finally {
unlock();
}
}

async resume(): Promise<void> {
async resume(options?: { reuseSttPipeline?: STTPipeline }): Promise<void> {
const unlock = await this.lock.lock();
try {
await this._startSession({ spanName: 'resume_agent_activity', runOnEnter: false });
await this._startSession({
spanName: 'resume_agent_activity',
runOnEnter: false,
reuseSttPipeline: options?.reuseSttPipeline,
});
} finally {
unlock();
}
Expand All @@ -313,8 +322,9 @@ export class AgentActivity implements RecognitionHooks {
private async _startSession(options: {
spanName: 'start_agent_activity' | 'resume_agent_activity';
runOnEnter: boolean;
reuseSttPipeline?: STTPipeline;
}): Promise<void> {
const { spanName, runOnEnter } = options;
const { spanName, runOnEnter, reuseSttPipeline } = options;
const startSpan = tracer.startSpan({
name: spanName,
attributes: { [traceTypes.ATTR_AGENT_LABEL]: this.agent.id },
Expand Down Expand Up @@ -415,9 +425,15 @@ export class AgentActivity implements RecognitionHooks {
sttProvider: this.getSttProvider(),
getLinkedParticipant: () => this.agentSession._roomIO?.linkedParticipant,
});
this.audioRecognition.start();
this.started = true;

if (reuseSttPipeline) {
this.logger.debug('Reusing STT pipeline from previous activity');
await this.audioRecognition.start({ sttPipeline: reuseSttPipeline });
} else {
await this.audioRecognition.start();
}

this.started = true;
this._resumeSchedulingTask();

if (runOnEnter) {
Expand All @@ -438,6 +454,30 @@ export class AgentActivity implements RecognitionHooks {
startSpan.end();
}

async _detachSttPipelineIfReusable(newActivity: AgentActivity): Promise<STTPipeline | undefined> {
const hasAudioRecognition = !!this.audioRecognition;
const hasSttOld = !!this.stt;
const hasSttNew = !!newActivity.stt;
const sameSttInstance = this.stt === newActivity.stt;
const sameSttNode =
Object.getPrototypeOf(this.agent).sttNode ===
Object.getPrototypeOf(newActivity.agent).sttNode;

if (!hasAudioRecognition || !hasSttOld || !hasSttNew) {
return undefined;
}

if (!sameSttInstance) {
return undefined;
}

if (!sameSttNode) {
return undefined;
}

return await this.audioRecognition!.detachSttPipeline();
}

get currentSpeech(): SpeechHandle | undefined {
return this._currentSpeech;
}
Expand Down
123 changes: 123 additions & 0 deletions agents/src/voice/agent_activity_handoff.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { AudioFrame } from '@livekit/rtc-node';
import { describe, expect, it, vi } from 'vitest';
import { type SpeechEvent } from '../stt/stt.js';
import { Agent } from './agent.js';
import { AgentActivity } from './agent_activity.js';

type FakeActivity = {
agent: Agent;
audioRecognition: { detachSttPipeline: ReturnType<typeof vi.fn> } | undefined;
stt: unknown;
};

function createFakeActivity(agent: Agent, stt: unknown) {
const detachedPipeline = { id: Symbol('pipeline') };
const activity = {
agent,
audioRecognition: {
detachSttPipeline: vi.fn(async () => detachedPipeline),
},
stt,
} as FakeActivity;

return { activity, detachedPipeline };
}

async function detachIfReusable(oldActivity: FakeActivity, newActivity: FakeActivity) {
return await (AgentActivity.prototype as any)._detachSttPipelineIfReusable.call(
oldActivity,
newActivity,
);
}

describe('AgentActivity STT handoff reuse eligibility', () => {
it('reuses the pipeline when both activities share the same STT instance and sttNode', async () => {
const sharedStt = { id: 'shared-stt' };
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBe(oldActivity.detachedPipeline);
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1);
});

it('does not reuse when the STT instances differ', async () => {
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), { id: 'stt-a' });
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), { id: 'stt-b' });

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled();
});

it('does not reuse when either activity has no STT', async () => {
const sharedStt = { id: 'shared-stt' };
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), undefined);
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled();
});

it('does not reuse when the agents override sttNode differently', async () => {
const sharedStt = { id: 'shared-stt' };

class AgentA extends Agent {
async sttNode(_audio: ReadableStream<AudioFrame>, _modelSettings: any) {
return null as ReadableStream<SpeechEvent | string> | null;
}
}

class AgentB extends Agent {
async sttNode(_audio: ReadableStream<AudioFrame>, _modelSettings: any) {
return null as ReadableStream<SpeechEvent | string> | null;
}
}

const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).not.toHaveBeenCalled();
});

it('reuses when the new agent inherits the same sttNode implementation', async () => {
const sharedStt = { id: 'shared-stt' };

class AgentA extends Agent {
async sttNode(_audio: ReadableStream<AudioFrame>, _modelSettings: any) {
return null as ReadableStream<SpeechEvent | string> | null;
}
}

class AgentB extends AgentA {}

const oldActivity = createFakeActivity(new AgentA({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new AgentB({ instructions: 'b' }), sharedStt);

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBe(oldActivity.detachedPipeline);
expect(oldActivity.activity.audioRecognition?.detachSttPipeline).toHaveBeenCalledTimes(1);
});

it('does not reuse when the old activity has no audioRecognition', async () => {
const sharedStt = { id: 'shared-stt' };
const oldActivity = createFakeActivity(new Agent({ instructions: 'a' }), sharedStt);
const newActivity = createFakeActivity(new Agent({ instructions: 'b' }), sharedStt);
oldActivity.activity.audioRecognition = undefined;

const result = await detachIfReusable(oldActivity.activity, newActivity.activity);

expect(result).toBeUndefined();
});
});
32 changes: 29 additions & 3 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { Task } from '../utils.js';
import type { VAD } from '../vad.js';
import type { Agent } from './agent.js';
import { AgentActivity } from './agent_activity.js';
import type { _TurnDetector } from './audio_recognition.js';
import type { STTPipeline, _TurnDetector } from './audio_recognition.js';
import {
type AgentEvent,
AgentSessionEventTypes,
Expand Down Expand Up @@ -223,6 +223,7 @@ export class AgentSession<
private _input: AgentInput;
private _output: AgentOutput;

private closing = false;
private closingTask: Promise<void> | null = null;
private userAwayTimer: NodeJS.Timeout | null = null;

Expand Down Expand Up @@ -515,6 +516,7 @@ export class AgentSession<
return;
}

this.closing = false;
this._usageCollector = new ModelUsageCollector();

let ctx: JobContext | undefined = undefined;
Expand Down Expand Up @@ -760,6 +762,7 @@ export class AgentSession<
const runWithContext = async () => {
const unlock = await this.activityLock.lock();
let onEnterTask: Task<void> | undefined;
let reusedSttPipeline: STTPipeline | undefined;

try {
this.agent = agent;
Expand All @@ -782,6 +785,10 @@ export class AgentSession<
this.nextActivity = agent._agentActivity;
}

if (prevActivityObj && this.nextActivity && prevActivityObj !== this.nextActivity) {
reusedSttPipeline = await prevActivityObj._detachSttPipelineIfReusable(this.nextActivity);
}

if (prevActivityObj && prevActivityObj !== this.nextActivity) {
if (previousActivity === 'pause') {
await prevActivityObj.pause({ blockedTasks });
Expand All @@ -791,6 +798,18 @@ export class AgentSession<
}
}

if (this.closing && newActivity === 'start') {
this.logger.warn(
{ agentId: this.nextActivity?.agent.id },
'Session is closing, skipping start of next activity',
);
await reusedSttPipeline?.close();
reusedSttPipeline = undefined;
this.nextActivity = undefined;
this.activity = undefined;
return;
}

this.activity = this.nextActivity;
this.nextActivity = undefined;

Expand All @@ -815,16 +834,22 @@ export class AgentSession<
);

if (newActivity === 'start') {
await this.activity!.start();
await this.activity!.start({ reuseSttPipeline: reusedSttPipeline });
} else {
await this.activity!.resume();
await this.activity!.resume({ reuseSttPipeline: reusedSttPipeline });
}
reusedSttPipeline = undefined;

onEnterTask = this.activity!._onEnterTask;

if (this._input.audio) {
this.activity!.attachAudioInput(this._input.audio.stream);
}
} catch (error) {
// JS safeguard: session cleanup owns the detached pipeline until the next activity
// starts successfully, preventing leaks when handoff fails mid-transition.
await reusedSttPipeline?.close();
throw error;
} finally {
unlock();
}
Expand Down Expand Up @@ -1130,6 +1155,7 @@ export class AgentSession<
return;
}

this.closing = true;
this._cancelUserAwayTimer();
this._onAecWarmupExpired();
this.off(AgentSessionEventTypes.UserInputTranscribed, this._onUserInputTranscribed);
Expand Down
Loading
Loading