From 8925fc01847e6eff22514a84eb3cbbb277a53e10 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Sun, 1 Mar 2026 18:02:46 +0530 Subject: [PATCH 01/16] Implement fallback adapter for sst with SPDX license header --- agents/src/stt/fallback_adapter.ts | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 agents/src/stt/fallback_adapter.ts diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts new file mode 100644 index 000000000..3e1b1bbde --- /dev/null +++ b/agents/src/stt/fallback_adapter.ts @@ -0,0 +1,3 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 From a16fa63fa2c5f954f5307491a111acb02b62d5d2 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 2 Mar 2026 04:06:01 +0530 Subject: [PATCH 02/16] Add FallbackAdapter class for STT with error handling and streaming support --- agents/src/stt/fallback_adapter.ts | 53 ++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 3e1b1bbde..a7645b6c2 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -1,3 +1,56 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { log } from '../log.js'; +import { Task } from '../utils.js'; +import type { VAD } from '../vad.js'; +import { StreamAdapter } from './stream_adapter.js'; +import { STT } from './stt.js'; + +interface STTStatus { + available: boolean; + recoveringSynthesizeTask: Task | null; + recoveringStreamTask: Task | null; +} + +interface FallbackAdapterOptions { + sstInstances: STT[]; + vad?: VAD; + attemptTimeoutMs: number; + maxRetryPerSTT: number; + retryIntervalMs: number; +} + +export class FallbackAdapter extends STT { + label = 'sst.FallbackAdapter'; + + readonly sstInstances: STT[]; + readonly attemptTimeoutMs: number; + readonly maxRetryPerSTT: number; + readonly retryIntervalMs: number; + + private _status: STTStatus[]; + private _logger = log(); + + constructor(opts: FallbackAdapterOptions) { + if (!opts.sstInstances || opts.sstInstances.length < 1) { + throw new Error('At least one STT instance must be provided.'); + } + let sttInstances = opts.sttInstances; + const nonStreaming = sttInstances.filter((s) => !s.capabilities.streaming); + if (nonStreaming.length > 0) { + if (!opts.vad) { + const labels = nonStreaming.map((s) => s.label).join(', '); + throw new Error( + `STTs do not support streaming: ${labels}. ` + + 'Provide a VAD to enable stt.StreamAdapter automatically ' + + 'or wrap them with stt.StreamAdapter before using this adapter.', + ); + } + const vad = opts.vad; + sttInstances = sttInstances.map((s) => + s.capabilities.streaming ? s : new StreamAdapter(s, vad), + ); + } + } +} From eef81e610837f75d94a82d61fcb57df7c13081b7 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 2 Mar 2026 04:23:16 +0530 Subject: [PATCH 03/16] Refactor FallbackAdapter to use 'sttInstances' instead of 'sstInstances' and enhance event forwarding for metrics and error handling --- agents/src/stt/fallback_adapter.ts | 39 +++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index a7645b6c2..158d01824 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -24,11 +24,11 @@ interface FallbackAdapterOptions { export class FallbackAdapter extends STT { label = 'sst.FallbackAdapter'; - readonly sstInstances: STT[]; + readonly sttInstances: STT[]; readonly attemptTimeoutMs: number; readonly maxRetryPerSTT: number; readonly retryIntervalMs: number; - + private _status: STTStatus[]; private _logger = log(); @@ -36,11 +36,11 @@ export class FallbackAdapter extends STT { if (!opts.sstInstances || opts.sstInstances.length < 1) { throw new Error('At least one STT instance must be provided.'); } - let sttInstances = opts.sttInstances; - const nonStreaming = sttInstances.filter((s) => !s.capabilities.streaming); + let sttInstances = opts.sstInstances!; + const nonStreaming = sttInstances.filter((s: STT) => !s.capabilities.streaming); if (nonStreaming.length > 0) { if (!opts.vad) { - const labels = nonStreaming.map((s) => s.label).join(', '); + const labels = nonStreaming.map((s: STT) => s.label).join(', '); throw new Error( `STTs do not support streaming: ${labels}. ` + 'Provide a VAD to enable stt.StreamAdapter automatically ' + @@ -48,9 +48,36 @@ export class FallbackAdapter extends STT { ); } const vad = opts.vad; - sttInstances = sttInstances.map((s) => + sttInstances = sttInstances.map((s: STT) => s.capabilities.streaming ? s : new StreamAdapter(s, vad), ); } + + super({ + streaming: true, + interimResults: sttInstances.every((s) => s.capabilities.interimResults), + }); + + this.sttInstances = sttInstances; + this.attemptTimeoutMs = opts.attemptTimeoutMs ?? 10000; + this.maxRetryPerSTT = opts.maxRetryPerSTT ?? 1; + this.retryIntervalMs = opts.retryIntervalMs ?? 5000; + this._status = sttInstances.map(() => ({ + available: true, + recoveringSynthesizeTask: null, + recoveringStreamTask: null, + })); + this.setupEventForwarding(); + } + + get status(): STTStatus[] { + return this._status; + } + + private setupEventForwarding(): void { + this.sttInstances.forEach((stt: STT) => { + stt.on('metrics_collected', (metrics) => this.emit('metrics_collected', metrics)); + stt.on('error', (error) => this.emit('error', error)); + }); } } From 7849e2380ea466ecb845554af4dcc891e72656de Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 2 Mar 2026 04:35:47 +0530 Subject: [PATCH 04/16] Enhance FallbackAdapter with availability event handling and improved recognition recovery logic --- agents/src/stt/fallback_adapter.ts | 176 ++++++++++++++++++++++++++++- 1 file changed, 174 insertions(+), 2 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 158d01824..b9241bd22 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -1,11 +1,20 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; -import { Task } from '../utils.js'; +import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; +import { Task, cancelAndWait, combineSignals } from '../utils.js'; import type { VAD } from '../vad.js'; import { StreamAdapter } from './stream_adapter.js'; -import { STT } from './stt.js'; +import type { SpeechEvent } from './stt.js'; +import { STT, SpeechEventType, SpeechStream } from './stt.js'; + +const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { + maxRetry: 0, + timeoutMs: DEFAULT_API_CONNECT_OPTIONS.timeoutMs, + retryIntervalMs: DEFAULT_API_CONNECT_OPTIONS.retryIntervalMs, +}; interface STTStatus { available: boolean; @@ -21,6 +30,11 @@ interface FallbackAdapterOptions { retryIntervalMs: number; } +export interface AvailabilityChangedEvent { + stt: STT; + available: boolean; +} + export class FallbackAdapter extends STT { label = 'sst.FallbackAdapter'; @@ -80,4 +94,162 @@ export class FallbackAdapter extends STT { stt.on('error', (error) => this.emit('error', error)); }); } + + private emitAvailabilityChanged(stt: STT, available: boolean): void { + const event: AvailabilityChangedEvent = { stt, available }; + (this as unknown as { emit: (event: string, data: AvailabilityChangedEvent) => void }).emit( + 'stt_availability_changed', + event, + ); + } + + private async tryRecognize({ + stt, + buffer, + connOptions, + abortSignal, + recovering = false, + }: { + stt: STT; + buffer: AudioBuffer; + connOptions: APIConnectOptions; + abortSignal?: AbortSignal; + recovering?: boolean; + }): Promise { + const timeoutController = new AbortController(); + const timeout = setTimeout(() => timeoutController.abort(), connOptions.timeoutMs); + + const effectiveSignal = abortSignal + ? combineSignals(abortSignal, timeoutController.signal) + : timeoutController.signal; + + try { + return await stt.recognize(buffer as AudioBuffer, effectiveSignal); + } catch (e) { + if (recovering) { + if (e instanceof APIError) { + this._logger.warn({ stt: stt.label, error: e }, 'recovery failed'); + } else { + this._logger.warn({ stt: stt.label, error: e }, 'recovery unexpected error'); + } + } else { + if (e instanceof APIError) { + this._logger.warn({ stt: stt.label, error: e }, 'failed, switching to next STT'); + } else { + this._logger.warn( + { stt: stt.label, error: e }, + 'unexpected error, switching to next STT', + ); + } + } + throw e; + } finally { + clearTimeout(timeout); + } + } + + private tryRecoverRecognize({ + stt, + buffer, + connOptions, + }: { + stt: STT; + buffer: AudioBuffer; + connOptions: APIConnectOptions; + }): void { + const index = this.sttInstances.indexOf(stt); + const sttStatus = this._status[index]!; + + if (sttStatus.recoveringSynthesizeTask && !sttStatus.recoveringSynthesizeTask.done) { + return; + } + + sttStatus.recoveringSynthesizeTask = Task.from(async () => { + try { + await this.tryRecognize({ + stt, + buffer, + connOptions, + recovering: true, + }); + sttStatus.available = true; + this._logger.info({ stt: stt.label }, 'recovered'); + this.emitAvailabilityChanged(stt, true); + } catch (e) { + this._logger.debug({ stt: stt.label, error: e }, 'recognize recovery attempt failed'); + } + }); + } + + protected async _recognize(buffer: AudioBuffer, abortSignal?: AbortSignal): Promise { + const startTime = Date.now(); + + const allFailed = this._status.every((s) => !s.available); + if (allFailed) { + this._logger.error('all STTs are unavailable, retrying...'); + } + + const connOptions: APIConnectOptions = { + ...DEFAULT_FALLBACK_API_CONNECT_OPTIONS, + maxRetry: this.maxRetryPerSTT, + timeoutMs: this.attemptTimeoutMs, + retryIntervalMs: this.retryIntervalMs, + }; + + for (let i = 0; i < this.sttInstances.length; i++) { + const stt = this.sttInstances[i]!; + const sttStatus = this._status[i]!; + + if (sttStatus.available || allFailed) { + try { + return await this.tryRecognize({ + stt, + buffer, + connOptions, + abortSignal, + recovering: false, + }); + } catch { + if (sttStatus.available) { + sttStatus.available = false; + this.emitAvailabilityChanged(stt, false); + } + } + } + + this.tryRecoverRecognize({ stt, buffer, connOptions }); + } + + const labels = this.sttInstances.map((s) => s.label).join(', '); + throw new APIConnectionError({ + message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, + }); + } + + stream(options?: { connOptions?: APIConnectOptions }): SpeechStream { + return new FallbackSpeechStream( + this, + options?.connOptions ?? DEFAULT_FALLBACK_API_CONNECT_OPTIONS, + ); + } + async close(): Promise { + const tasks = this._status.flatMap((s) => + [s.recoveringSynthesizeTask, s.recoveringStreamTask].filter( + (t): t is Task => t !== null, + ), + ); + + if (tasks.length > 0) { + await cancelAndWait(tasks, 1000); + } + + for (const stt of this.sttInstances) { + stt.removeAllListeners('metrics_collected'); + stt.removeAllListeners('error'); + } + + await Promise.all(this.sttInstances.map((s) => s.close())); + } } + +class FallbackSpeechStream extends SpeechStream {} From 37df2127a77bd8e93fae61074224f5fee491ffc1 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 2 Mar 2026 04:38:22 +0530 Subject: [PATCH 05/16] Add FallbackSpeechStream class to FallbackAdapter for enhanced streaming and error handling --- agents/src/stt/fallback_adapter.ts | 190 ++++++++++++++++++++++++++++- 1 file changed, 189 insertions(+), 1 deletion(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index b9241bd22..13e363256 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -1,9 +1,11 @@ // SPDX-FileCopyrightText: 2025 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; +import type { AudioBuffer } from '../utils.js'; import { Task, cancelAndWait, combineSignals } from '../utils.js'; import type { VAD } from '../vad.js'; import { StreamAdapter } from './stream_adapter.js'; @@ -252,4 +254,190 @@ export class FallbackAdapter extends STT { } } -class FallbackSpeechStream extends SpeechStream {} +class FallbackSpeechStream extends SpeechStream { + label = 'stt.FallbackSpeechStream'; + + private adapter: FallbackAdapter; + private connOptions: APIConnectOptions; + private _logger = log(); + private recoveringStreams: SpeechStream[] = []; + + constructor(adapter: FallbackAdapter, connOptions: APIConnectOptions) { + super(adapter, undefined, connOptions); + this.adapter = adapter; + this.connOptions = connOptions; + } + + async monitorMetrics(): Promise { + return; + } + + protected async run(): Promise { + const startTime = Date.now(); + + const allFailed = this.adapter.status.every((s) => !s.available); + if (allFailed) { + this._logger.error('all STTs are unavailable, retrying...'); + } + + let mainStream: SpeechStream | null = null; + let forwardInputDone = false; + + const forwardInput = async () => { + try { + for await (const data of this.input) { + try { + for (const stream of this.recoveringStreams) { + if (data === SpeechStream.FLUSH_SENTINEL) { + stream.flush(); + } else { + stream.pushFrame(data as AudioFrame); + } + } + + if (mainStream) { + if (data === SpeechStream.FLUSH_SENTINEL) { + mainStream.flush(); + } else { + mainStream.pushFrame(data as AudioFrame); + } + } + } catch (e) { + if (e instanceof Error && e.message.includes('closed')) { + // stream already closed, safe to ignore + } else { + this._logger.error({ error: e }, 'error forwarding input'); + } + } + } + } finally { + forwardInputDone = true; + if (mainStream) { + try { + mainStream.endInput(); + } catch { + // ignore if already closed + } + } + } + }; + + let forwardInputTask: Promise | null = null; + + for (let i = 0; i < this.adapter.sttInstances.length; i++) { + const stt = this.adapter.sttInstances[i]!; + const sttStatus = this.adapter.status[i]!; + + if (sttStatus.available || allFailed) { + try { + const streamConnOptions: APIConnectOptions = { + ...this.connOptions, + maxRetry: this.adapter.maxRetryPerSTT, + timeoutMs: this.adapter.attemptTimeoutMs, + retryIntervalMs: this.adapter.retryIntervalMs, + }; + + mainStream = stt.stream({ connOptions: streamConnOptions }); + + if (!forwardInputTask || forwardInputDone) { + forwardInputTask = forwardInput(); + } + + try { + for await (const ev of mainStream) { + this.output.put(ev); + } + } catch (e) { + if (e instanceof APIError) { + this._logger.warn({ stt: stt.label, error: e }, 'failed, switching to next STT'); + } else { + this._logger.warn( + { stt: stt.label, error: e }, + 'unexpected error, switching to next STT', + ); + } + throw e; + } + + return; + } catch { + if (sttStatus.available) { + sttStatus.available = false; + ( + this.adapter as unknown as { + emit: (event: string, data: AvailabilityChangedEvent) => void; + } + ).emit('stt_availability_changed', { stt, available: false }); + } + } + } + + this.tryStreamRecovery(stt); + } + + for (const stream of this.recoveringStreams) { + stream.close(); + } + + const labels = this.adapter.sttInstances.map((s) => s.label).join(', '); + throw new APIConnectionError({ + message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, + }); + } + + private tryStreamRecovery(stt: STT): void { + const index = this.adapter.sttInstances.indexOf(stt); + const sttStatus = this.adapter.status[index]!; + + if (sttStatus.recoveringStreamTask && !sttStatus.recoveringStreamTask.done) { + return; + } + + const streamConnOptions: APIConnectOptions = { + ...this.connOptions, + maxRetry: 0, + timeoutMs: this.adapter.attemptTimeoutMs, + }; + + const stream = stt.stream({ connOptions: streamConnOptions }); + this.recoveringStreams.push(stream); + + sttStatus.recoveringStreamTask = Task.from(async () => { + try { + let transcriptCount = 0; + for await (const ev of stream) { + if (ev.type === SpeechEventType.FINAL_TRANSCRIPT) { + if (!ev.alternatives || !ev.alternatives[0].text) { + continue; + } + transcriptCount++; + break; + } + } + + if (transcriptCount === 0) { + return; + } + + sttStatus.available = true; + this._logger.info({ stt: stt.label }, 'recovered'); + ( + this.adapter as unknown as { + emit: (event: string, data: AvailabilityChangedEvent) => void; + } + ).emit('stt_availability_changed', { stt, available: true }); + } catch (e) { + if (e instanceof APIError) { + this._logger.warn({ stt: stt.label, error: e }, 'stream recovery failed'); + } else { + this._logger.warn({ stt: stt.label, error: e }, 'stream recovery unexpected error'); + } + } finally { + const idx = this.recoveringStreams.indexOf(stream); + if (idx !== -1) { + this.recoveringStreams.splice(idx, 1); + } + } + }); + } +} From 442854fa9a4b88d0c32000eaa51a7974d20a2a8e Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 2 Mar 2026 04:41:49 +0530 Subject: [PATCH 06/16] Export FallbackAdapter and AvailabilityChangedEvent from fallback_adapter.js to enhance STT module integration --- agents/src/stt/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/agents/src/stt/index.ts b/agents/src/stt/index.ts index 610a9e2b6..8905e06af 100644 --- a/agents/src/stt/index.ts +++ b/agents/src/stt/index.ts @@ -13,3 +13,4 @@ export { SpeechStream, } from './stt.js'; export { StreamAdapter, StreamAdapterWrapper } from './stream_adapter.js'; +export { FallbackAdapter, type AvailabilityChangedEvent } from './fallback_adapter.js'; From a07790dca130238ef2508cb7973a5ee9a8c17438 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 2 Mar 2026 10:29:07 +0530 Subject: [PATCH 07/16] Correct label property in FallbackAdapter from 'sst.FallbackAdapter' to 'stt.FallbackAdapter' for consistency in STT module. --- agents/src/stt/fallback_adapter.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 13e363256..43d6d3bab 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -25,7 +25,7 @@ interface STTStatus { } interface FallbackAdapterOptions { - sstInstances: STT[]; + sttInstances: STT[]; vad?: VAD; attemptTimeoutMs: number; maxRetryPerSTT: number; @@ -38,7 +38,7 @@ export interface AvailabilityChangedEvent { } export class FallbackAdapter extends STT { - label = 'sst.FallbackAdapter'; + label = 'stt.FallbackAdapter'; readonly sttInstances: STT[]; readonly attemptTimeoutMs: number; @@ -49,10 +49,10 @@ export class FallbackAdapter extends STT { private _logger = log(); constructor(opts: FallbackAdapterOptions) { - if (!opts.sstInstances || opts.sstInstances.length < 1) { + if (!opts.sttInstances || opts.sttInstances.length < 1) { throw new Error('At least one STT instance must be provided.'); } - let sttInstances = opts.sstInstances!; + let sttInstances = opts.sttInstances!; const nonStreaming = sttInstances.filter((s: STT) => !s.capabilities.streaming); if (nonStreaming.length > 0) { if (!opts.vad) { From 4b5f9716b3af59af03907d3ebeeabf03e8c758c0 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Sun, 8 Mar 2026 09:09:24 +0530 Subject: [PATCH 08/16] Refactor FallbackAdapter to improve task handling and streamline recognition logic, including renaming recovery tasks and making timeout options optional. --- agents/src/stt/fallback_adapter.ts | 143 ++++++++++++++++------------- 1 file changed, 80 insertions(+), 63 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 43d6d3bab..3a2d0cf52 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -20,16 +20,16 @@ const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { interface STTStatus { available: boolean; - recoveringSynthesizeTask: Task | null; + recoveringRecognizeTask: Task | null; recoveringStreamTask: Task | null; } interface FallbackAdapterOptions { sttInstances: STT[]; vad?: VAD; - attemptTimeoutMs: number; - maxRetryPerSTT: number; - retryIntervalMs: number; + attemptTimeoutMs?: number; + maxRetryPerSTT?: number; + retryIntervalMs?: number; } export interface AvailabilityChangedEvent { @@ -52,7 +52,7 @@ export class FallbackAdapter extends STT { if (!opts.sttInstances || opts.sttInstances.length < 1) { throw new Error('At least one STT instance must be provided.'); } - let sttInstances = opts.sttInstances!; + let sttInstances = opts.sttInstances; const nonStreaming = sttInstances.filter((s: STT) => !s.capabilities.streaming); if (nonStreaming.length > 0) { if (!opts.vad) { @@ -80,7 +80,7 @@ export class FallbackAdapter extends STT { this.retryIntervalMs = opts.retryIntervalMs ?? 5000; this._status = sttInstances.map(() => ({ available: true, - recoveringSynthesizeTask: null, + recoveringRecognizeTask: null, recoveringStreamTask: null, })); this.setupEventForwarding(); @@ -97,7 +97,7 @@ export class FallbackAdapter extends STT { }); } - private emitAvailabilityChanged(stt: STT, available: boolean): void { + emitAvailabilityChanged(stt: STT, available: boolean): void { const event: AvailabilityChangedEvent = { stt, available }; (this as unknown as { emit: (event: string, data: AvailabilityChangedEvent) => void }).emit( 'stt_availability_changed', @@ -126,7 +126,7 @@ export class FallbackAdapter extends STT { : timeoutController.signal; try { - return await stt.recognize(buffer as AudioBuffer, effectiveSignal); + return await stt.recognize(buffer, effectiveSignal); } catch (e) { if (recovering) { if (e instanceof APIError) { @@ -162,11 +162,11 @@ export class FallbackAdapter extends STT { const index = this.sttInstances.indexOf(stt); const sttStatus = this._status[index]!; - if (sttStatus.recoveringSynthesizeTask && !sttStatus.recoveringSynthesizeTask.done) { + if (sttStatus.recoveringRecognizeTask && !sttStatus.recoveringRecognizeTask.done) { return; } - sttStatus.recoveringSynthesizeTask = Task.from(async () => { + sttStatus.recoveringRecognizeTask = Task.from(async () => { try { await this.tryRecognize({ stt, @@ -234,9 +234,10 @@ export class FallbackAdapter extends STT { options?.connOptions ?? DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ); } + async close(): Promise { const tasks = this._status.flatMap((s) => - [s.recoveringSynthesizeTask, s.recoveringStreamTask].filter( + [s.recoveringRecognizeTask, s.recoveringStreamTask].filter( (t): t is Task => t !== null, ), ); @@ -272,6 +273,17 @@ class FallbackSpeechStream extends SpeechStream { return; } + private cleanupRecoveringStreams(): void { + for (const stream of this.recoveringStreams) { + try { + stream.close(); + } catch { + // safe to ignore if already closed + } + } + this.recoveringStreams = []; + } + protected async run(): Promise { const startTime = Date.now(); @@ -319,70 +331,78 @@ class FallbackSpeechStream extends SpeechStream { // ignore if already closed } } + for (const stream of this.recoveringStreams) { + try { + stream.endInput(); + } catch { + // ignore if already closed + } + } } }; let forwardInputTask: Promise | null = null; - for (let i = 0; i < this.adapter.sttInstances.length; i++) { - const stt = this.adapter.sttInstances[i]!; - const sttStatus = this.adapter.status[i]!; + try { + for (let i = 0; i < this.adapter.sttInstances.length; i++) { + const stt = this.adapter.sttInstances[i]!; + const sttStatus = this.adapter.status[i]!; - if (sttStatus.available || allFailed) { - try { - const streamConnOptions: APIConnectOptions = { - ...this.connOptions, - maxRetry: this.adapter.maxRetryPerSTT, - timeoutMs: this.adapter.attemptTimeoutMs, - retryIntervalMs: this.adapter.retryIntervalMs, - }; + if (sttStatus.available || allFailed) { + try { + const streamConnOptions: APIConnectOptions = { + ...this.connOptions, + maxRetry: this.adapter.maxRetryPerSTT, + timeoutMs: this.adapter.attemptTimeoutMs, + retryIntervalMs: this.adapter.retryIntervalMs, + }; - mainStream = stt.stream({ connOptions: streamConnOptions }); + mainStream = stt.stream({ connOptions: streamConnOptions }); - if (!forwardInputTask || forwardInputDone) { - forwardInputTask = forwardInput(); - } - - try { - for await (const ev of mainStream) { - this.output.put(ev); - } - } catch (e) { - if (e instanceof APIError) { - this._logger.warn({ stt: stt.label, error: e }, 'failed, switching to next STT'); - } else { - this._logger.warn( - { stt: stt.label, error: e }, - 'unexpected error, switching to next STT', - ); + if (!forwardInputTask || forwardInputDone) { + forwardInputTask = forwardInput(); } - throw e; - } - return; - } catch { - if (sttStatus.available) { - sttStatus.available = false; - ( - this.adapter as unknown as { - emit: (event: string, data: AvailabilityChangedEvent) => void; + try { + for await (const ev of mainStream) { + this.output.put(ev); } - ).emit('stt_availability_changed', { stt, available: false }); + } catch (e) { + if (e instanceof APIError) { + this._logger.warn({ stt: stt.label, error: e }, 'failed, switching to next STT'); + } else { + this._logger.warn( + { stt: stt.label, error: e }, + 'unexpected error, switching to next STT', + ); + } + throw e; + } + + this.cleanupRecoveringStreams(); + return; + } catch { + if (sttStatus.available) { + sttStatus.available = false; + this.adapter.emitAvailabilityChanged(stt, false); + } } } + + this.tryStreamRecovery(stt); } - this.tryStreamRecovery(stt); - } + this.cleanupRecoveringStreams(); - for (const stream of this.recoveringStreams) { - stream.close(); + const labels = this.adapter.sttInstances.map((s) => s.label).join(', '); + throw new APIConnectionError({ + message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, + }); + } finally { + if (forwardInputTask) { + forwardInputTask.catch(() => {}); + } } - - const labels = this.adapter.sttInstances.map((s) => s.label).join(', '); - throw new APIConnectionError({ - message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, - }); } private tryStreamRecovery(stt: STT): void { @@ -397,6 +417,7 @@ class FallbackSpeechStream extends SpeechStream { ...this.connOptions, maxRetry: 0, timeoutMs: this.adapter.attemptTimeoutMs, + retryIntervalMs: this.adapter.retryIntervalMs, }; const stream = stt.stream({ connOptions: streamConnOptions }); @@ -421,11 +442,7 @@ class FallbackSpeechStream extends SpeechStream { sttStatus.available = true; this._logger.info({ stt: stt.label }, 'recovered'); - ( - this.adapter as unknown as { - emit: (event: string, data: AvailabilityChangedEvent) => void; - } - ).emit('stt_availability_changed', { stt, available: true }); + this.adapter.emitAvailabilityChanged(stt, true); } catch (e) { if (e instanceof APIError) { this._logger.warn({ stt: stt.label, error: e }, 'stream recovery failed'); From 0ed9a88f18c024acf9a4835d18b5ba37b351918d Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Sun, 8 Mar 2026 09:28:21 +0530 Subject: [PATCH 09/16] Close mainStream in FallbackSpeechStream on error to prevent resource leaks and ensure proper stream management. --- agents/src/stt/fallback_adapter.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 3a2d0cf52..8b5878d82 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -376,6 +376,8 @@ class FallbackSpeechStream extends SpeechStream { 'unexpected error, switching to next STT', ); } + mainStream.close(); + mainStream = null; throw e; } From c6dcc4cfeb77748d66fd006c6a8cfc846c6f18c0 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 9 Mar 2026 02:26:00 +0530 Subject: [PATCH 10/16] Improve error handling in FallbackSpeechStream by ensuring proper logging for input forwarding errors and closing streams on removal to enhance resource management. --- agents/src/stt/fallback_adapter.ts | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 8b5878d82..6cf2a93a2 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -298,27 +298,31 @@ class FallbackSpeechStream extends SpeechStream { const forwardInput = async () => { try { for await (const data of this.input) { - try { - for (const stream of this.recoveringStreams) { + for (const stream of this.recoveringStreams) { + try { if (data === SpeechStream.FLUSH_SENTINEL) { stream.flush(); } else { stream.pushFrame(data as AudioFrame); } + } catch (e) { + if (!(e instanceof Error && e.message.includes('closed'))) { + this._logger.warn({ error: e }, 'error forwarding input to recovering stream'); + } } + } - if (mainStream) { + if (mainStream) { + try { if (data === SpeechStream.FLUSH_SENTINEL) { mainStream.flush(); } else { mainStream.pushFrame(data as AudioFrame); } - } - } catch (e) { - if (e instanceof Error && e.message.includes('closed')) { - // stream already closed, safe to ignore - } else { - this._logger.error({ error: e }, 'error forwarding input'); + } catch (e) { + if (!(e instanceof Error && e.message.includes('closed'))) { + this._logger.error({ error: e }, 'error forwarding input to main stream'); + } } } } @@ -456,6 +460,7 @@ class FallbackSpeechStream extends SpeechStream { if (idx !== -1) { this.recoveringStreams.splice(idx, 1); } + stream.close(); } }); } From 8ca1c6ca63e3fcdc8a59ba84c5ff0de71c963338 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Mon, 9 Mar 2026 07:30:42 +0530 Subject: [PATCH 11/16] Enhance FallbackAdapter and FallbackSpeechStream with improved abort handling for recognition and streaming tasks, ensuring proper resource management and error logging during recovery attempts. --- agents/src/stt/fallback_adapter.ts | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 6cf2a93a2..155210e1a 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -166,18 +166,20 @@ export class FallbackAdapter extends STT { return; } - sttStatus.recoveringRecognizeTask = Task.from(async () => { + sttStatus.recoveringRecognizeTask = Task.from(async (controller) => { try { await this.tryRecognize({ stt, buffer, connOptions, + abortSignal: controller.signal, recovering: true, }); sttStatus.available = true; this._logger.info({ stt: stt.label }, 'recovered'); this.emitAvailabilityChanged(stt, true); } catch (e) { + if (controller.signal.aborted) return; this._logger.debug({ stt: stt.label, error: e }, 'recognize recovery attempt failed'); } }); @@ -298,7 +300,8 @@ class FallbackSpeechStream extends SpeechStream { const forwardInput = async () => { try { for await (const data of this.input) { - for (const stream of this.recoveringStreams) { + const recoveringSnapshot = [...this.recoveringStreams]; + for (const stream of recoveringSnapshot) { try { if (data === SpeechStream.FLUSH_SENTINEL) { stream.flush(); @@ -335,7 +338,7 @@ class FallbackSpeechStream extends SpeechStream { // ignore if already closed } } - for (const stream of this.recoveringStreams) { + for (const stream of [...this.recoveringStreams]) { try { stream.endInput(); } catch { @@ -406,7 +409,10 @@ class FallbackSpeechStream extends SpeechStream { }); } finally { if (forwardInputTask) { - forwardInputTask.catch(() => {}); + if (!this.input.closed) { + this.input.close(); + } + await forwardInputTask.catch(() => {}); } } } @@ -429,10 +435,13 @@ class FallbackSpeechStream extends SpeechStream { const stream = stt.stream({ connOptions: streamConnOptions }); this.recoveringStreams.push(stream); - sttStatus.recoveringStreamTask = Task.from(async () => { + sttStatus.recoveringStreamTask = Task.from(async (controller) => { + const onAbort = () => stream.close(); + controller.signal.addEventListener('abort', onAbort, { once: true }); try { let transcriptCount = 0; for await (const ev of stream) { + if (controller.signal.aborted) break; if (ev.type === SpeechEventType.FINAL_TRANSCRIPT) { if (!ev.alternatives || !ev.alternatives[0].text) { continue; @@ -442,7 +451,7 @@ class FallbackSpeechStream extends SpeechStream { } } - if (transcriptCount === 0) { + if (transcriptCount === 0 || controller.signal.aborted) { return; } @@ -450,12 +459,14 @@ class FallbackSpeechStream extends SpeechStream { this._logger.info({ stt: stt.label }, 'recovered'); this.adapter.emitAvailabilityChanged(stt, true); } catch (e) { + if (controller.signal.aborted) return; if (e instanceof APIError) { this._logger.warn({ stt: stt.label, error: e }, 'stream recovery failed'); } else { this._logger.warn({ stt: stt.label, error: e }, 'stream recovery unexpected error'); } } finally { + controller.signal.removeEventListener('abort', onAbort); const idx = this.recoveringStreams.indexOf(stream); if (idx !== -1) { this.recoveringStreams.splice(idx, 1); From 262d51c3f8a91a34dfee52d2622fcdb706febeff Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Tue, 10 Mar 2026 06:18:36 +0530 Subject: [PATCH 12/16] Refactor FallbackAdapter to implement retry logic for recognition attempts, enhancing error handling and introducing delay between retries. Update FallbackSpeechStream to improve resource management by ensuring proper closure of output streams and adjusting logging levels for input forwarding errors. --- agents/src/stt/fallback_adapter.ts | 46 +++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 155210e1a..4a278cf23 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -4,9 +4,9 @@ import type { AudioFrame } from '@livekit/rtc-node'; import { APIConnectionError, APIError } from '../_exceptions.js'; import { log } from '../log.js'; -import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS } from '../types.js'; +import { type APIConnectOptions, DEFAULT_API_CONNECT_OPTIONS, intervalForRetry } from '../types.js'; import type { AudioBuffer } from '../utils.js'; -import { Task, cancelAndWait, combineSignals } from '../utils.js'; +import { Task, cancelAndWait, combineSignals, delay } from '../utils.js'; import type { VAD } from '../vad.js'; import { StreamAdapter } from './stream_adapter.js'; import type { SpeechEvent } from './stt.js'; @@ -205,15 +205,29 @@ export class FallbackAdapter extends STT { const sttStatus = this._status[i]!; if (sttStatus.available || allFailed) { - try { - return await this.tryRecognize({ - stt, - buffer, - connOptions, - abortSignal, - recovering: false, - }); - } catch { + let lastError: unknown; + for (let attempt = 0; attempt <= connOptions.maxRetry; attempt++) { + try { + return await this.tryRecognize({ + stt, + buffer, + connOptions, + abortSignal, + recovering: false, + }); + } catch (e) { + lastError = e; + if (attempt < connOptions.maxRetry && e instanceof APIError && e.retryable) { + const retryInterval = intervalForRetry(connOptions, attempt); + if (retryInterval > 0) { + await delay(retryInterval); + } + continue; + } + break; + } + } + if (lastError) { if (sttStatus.available) { sttStatus.available = false; this.emitAvailabilityChanged(stt, false); @@ -276,14 +290,15 @@ class FallbackSpeechStream extends SpeechStream { } private cleanupRecoveringStreams(): void { - for (const stream of this.recoveringStreams) { + const streams = this.recoveringStreams; + this.recoveringStreams = []; + for (const stream of streams) { try { stream.close(); } catch { // safe to ignore if already closed } } - this.recoveringStreams = []; } protected async run(): Promise { @@ -324,7 +339,7 @@ class FallbackSpeechStream extends SpeechStream { } } catch (e) { if (!(e instanceof Error && e.message.includes('closed'))) { - this._logger.error({ error: e }, 'error forwarding input to main stream'); + this._logger.warn({ error: e }, 'error forwarding input to main stream'); } } } @@ -414,6 +429,9 @@ class FallbackSpeechStream extends SpeechStream { } await forwardInputTask.catch(() => {}); } + if (!this.output.closed) { + this.output.close(); + } } } From e32aae25dcf223067213d0752ee1716feef38373 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Tue, 10 Mar 2026 19:04:50 +0530 Subject: [PATCH 13/16] Update FallbackAdapter to include abort signal in retry delay logic, improving error handling during recognition attempts. --- agents/src/stt/fallback_adapter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 4a278cf23..5ce15cfb0 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -220,7 +220,7 @@ export class FallbackAdapter extends STT { if (attempt < connOptions.maxRetry && e instanceof APIError && e.retryable) { const retryInterval = intervalForRetry(connOptions, attempt); if (retryInterval > 0) { - await delay(retryInterval); + await delay(retryInterval, { signal: abortSignal }); } continue; } From 7e97b060db3f577f19544fb99ad011aaf1c04b36 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Tue, 10 Mar 2026 21:24:28 +0530 Subject: [PATCH 14/16] Refactor error handling in FallbackSpeechStream to capture and log stream errors more effectively, ensuring proper resource management and improved error reporting during STT transitions. --- agents/src/stt/fallback_adapter.ts | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 5ce15cfb0..0b9b3c78f 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -9,7 +9,7 @@ import type { AudioBuffer } from '../utils.js'; import { Task, cancelAndWait, combineSignals, delay } from '../utils.js'; import type { VAD } from '../vad.js'; import { StreamAdapter } from './stream_adapter.js'; -import type { SpeechEvent } from './stt.js'; +import type { STTError, SpeechEvent } from './stt.js'; import { STT, SpeechEventType, SpeechStream } from './stt.js'; const DEFAULT_FALLBACK_API_CONNECT_OPTIONS: APIConnectOptions = { @@ -385,22 +385,39 @@ class FallbackSpeechStream extends SpeechStream { forwardInputTask = forwardInput(); } + // The child SpeechStream swallows its own run() errors: mainTask() catches + // them, emits on the STT, re-throws into startSoon (unhandled), and then + // closes this.queue via .finally(). monitorMetrics() drains the queue and + // closes output, so the for-await below exits via { done: true } rather + // than throwing. We capture the error event to detect silent failures. + let streamError: unknown = null; + const captureStreamError = (err: STTError) => { + streamError = err.error; + }; + stt.once('error', captureStreamError); try { for await (const ev of mainStream) { this.output.put(ev); } - } catch (e) { - if (e instanceof APIError) { - this._logger.warn({ stt: stt.label, error: e }, 'failed, switching to next STT'); + } finally { + stt.off('error', captureStreamError); + } + + if (streamError !== null) { + if (streamError instanceof APIError) { + this._logger.warn( + { stt: stt.label, error: streamError }, + 'failed, switching to next STT', + ); } else { this._logger.warn( - { stt: stt.label, error: e }, + { stt: stt.label, error: streamError }, 'unexpected error, switching to next STT', ); } mainStream.close(); mainStream = null; - throw e; + throw streamError; } this.cleanupRecoveringStreams(); From d729b404101058644216750b93c869deb0ad990b Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Tue, 10 Mar 2026 23:08:06 +0530 Subject: [PATCH 15/16] Update FallbackAdapter to reduce default timeout values for attempts and retry intervals, enhancing responsiveness during STT operations. --- agents/src/stt/fallback_adapter.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 0b9b3c78f..20da246fe 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -75,9 +75,9 @@ export class FallbackAdapter extends STT { }); this.sttInstances = sttInstances; - this.attemptTimeoutMs = opts.attemptTimeoutMs ?? 10000; + this.attemptTimeoutMs = opts.attemptTimeoutMs ?? 5000; this.maxRetryPerSTT = opts.maxRetryPerSTT ?? 1; - this.retryIntervalMs = opts.retryIntervalMs ?? 5000; + this.retryIntervalMs = opts.retryIntervalMs ?? 1000; this._status = sttInstances.map(() => ({ available: true, recoveringRecognizeTask: null, From 13df18775cd2ef709b9b6fbf73a53e3806dd4366 Mon Sep 17 00:00:00 2001 From: Gokul Js Date: Wed, 11 Mar 2026 00:13:36 +0530 Subject: [PATCH 16/16] Enhance FallbackAdapter and FallbackSpeechStream with additional validation for configuration options and improved error logging during stream management, ensuring robustness in STT operations. --- agents/src/stt/fallback_adapter.ts | 36 +++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/agents/src/stt/fallback_adapter.ts b/agents/src/stt/fallback_adapter.ts index 20da246fe..5305a3cd4 100644 --- a/agents/src/stt/fallback_adapter.ts +++ b/agents/src/stt/fallback_adapter.ts @@ -74,6 +74,16 @@ export class FallbackAdapter extends STT { interimResults: sttInstances.every((s) => s.capabilities.interimResults), }); + if (opts.attemptTimeoutMs !== undefined && opts.attemptTimeoutMs <= 0) { + throw new Error('attemptTimeoutMs must be a positive number.'); + } + if (opts.maxRetryPerSTT !== undefined && opts.maxRetryPerSTT < 0) { + throw new Error('maxRetryPerSTT must be a non-negative number.'); + } + if (opts.retryIntervalMs !== undefined && opts.retryIntervalMs < 0) { + throw new Error('retryIntervalMs must be a non-negative number.'); + } + this.sttInstances = sttInstances; this.attemptTimeoutMs = opts.attemptTimeoutMs ?? 5000; this.maxRetryPerSTT = opts.maxRetryPerSTT ?? 1; @@ -163,6 +173,7 @@ export class FallbackAdapter extends STT { const sttStatus = this._status[index]!; if (sttStatus.recoveringRecognizeTask && !sttStatus.recoveringRecognizeTask.done) { + this._logger.debug({ stt: stt.label }, 'recognize recovery already in progress, skipping'); return; } @@ -295,8 +306,8 @@ class FallbackSpeechStream extends SpeechStream { for (const stream of streams) { try { stream.close(); - } catch { - // safe to ignore if already closed + } catch (e) { + this._logger.debug({ error: e }, 'error closing recovering stream'); } } } @@ -349,15 +360,15 @@ class FallbackSpeechStream extends SpeechStream { if (mainStream) { try { mainStream.endInput(); - } catch { - // ignore if already closed + } catch (e) { + this._logger.debug({ error: e }, 'error ending main stream input'); } } for (const stream of [...this.recoveringStreams]) { try { stream.endInput(); - } catch { - // ignore if already closed + } catch (e) { + this._logger.debug({ error: e }, 'error ending recovering stream input'); } } } @@ -422,7 +433,10 @@ class FallbackSpeechStream extends SpeechStream { this.cleanupRecoveringStreams(); return; - } catch { + } catch (e) { + if (!(e instanceof APIError) && !(e instanceof APIConnectionError)) { + this._logger.warn({ stt: stt.label, error: e }, 'unexpected error in stream loop'); + } if (sttStatus.available) { sttStatus.available = false; this.adapter.emitAvailabilityChanged(stt, false); @@ -440,6 +454,7 @@ class FallbackSpeechStream extends SpeechStream { message: `all STTs failed (${labels}) after ${Date.now() - startTime}ms`, }); } finally { + this.cleanupRecoveringStreams(); if (forwardInputTask) { if (!this.input.closed) { this.input.close(); @@ -457,6 +472,7 @@ class FallbackSpeechStream extends SpeechStream { const sttStatus = this.adapter.status[index]!; if (sttStatus.recoveringStreamTask && !sttStatus.recoveringStreamTask.done) { + this._logger.debug({ stt: stt.label }, 'stream recovery already in progress, skipping'); return; } @@ -506,7 +522,11 @@ class FallbackSpeechStream extends SpeechStream { if (idx !== -1) { this.recoveringStreams.splice(idx, 1); } - stream.close(); + try { + stream.close(); + } catch (e) { + this._logger.debug({ error: e }, 'error closing recovery stream in finally'); + } } }); }