diff --git a/samples/js/live-audio-transcription/app.js b/samples/js/live-audio-transcription/app.js index 60d583f0..a096fbad 100644 --- a/samples/js/live-audio-transcription/app.js +++ b/samples/js/live-audio-transcription/app.js @@ -5,7 +5,7 @@ // // Usage: node app.js -import { FoundryLocalManager } from 'foundry-local-sdk'; +import { FoundryLocalManager, LiveAudioStreamError } from 'foundry-local-sdk'; console.log('╔══════════════════════════════════════════════════════════╗'); console.log('║ Foundry Local — Live Audio Transcription (JS SDK) ║'); @@ -39,9 +39,16 @@ console.log('Loading model...'); await model.load(); console.log('✓ Model loaded'); +// Graceful-shutdown coordinator. Set ONCE on the session via +// createLiveTranscriptionSession(signal) — every subsequent +// start() / append() / getTranscriptionStream() / stop() call picks it +// up automatically, so we don't have to thread the signal through every +// callsite. +const shutdown = new AbortController(); + // Create live transcription session (same pattern as C# sample). const audioClient = model.createAudioClient(); -const session = audioClient.createLiveTranscriptionSession(); +const session = audioClient.createLiveTranscriptionSession(shutdown.signal); session.settings.sampleRate = 16000; // Default is 16000; shown here for clarity session.settings.channels = 1; @@ -67,9 +74,22 @@ const readPromise = (async () => { } } } catch (err) { - if (err.name !== 'AbortError') { - console.error('Stream error:', err.message); + // AbortError is expected on Ctrl+C; ignore quietly. + if (err.name === 'AbortError') return; + + // LiveAudioStreamError surfaces native-core failure metadata (code + isTransient). + // Use it to retry quietly on transient blips instead of dying on the + // first hiccup. Without LiveAudioStreamError the only signal would be err.message. + if (err instanceof LiveAudioStreamError) { + if (err.isTransient) { + console.warn(`\n⚠ Transient ASR error (${err.code}): ${err.message}. Continuing...`); + return; + } + console.error(`\n✗ Stream error [${err.code}]: ${err.message}`); + return; } + + console.error('\n✗ Stream error:', err.message); } })(); @@ -108,14 +128,18 @@ try { try { while (appendQueue.length > 0) { const pcm = appendQueue.shift(); + // Session-level signal (set in createLiveTranscriptionSession) + // applies automatically — no need to pass it here. await session.append(pcm); } } catch (err) { + // Aborted via Ctrl+C — exit quietly. + if (err.name === 'AbortError') return; console.error('append error:', err.message); } finally { pumping = false; // Handle race where new data arrived after loop exit. - if (appendQueue.length > 0) { + if (appendQueue.length > 0 && !shutdown.signal.aborted) { void pumpAudio(); } } @@ -182,9 +206,14 @@ try { process.exit(0); } -// Handle graceful shutdown +// Handle graceful shutdown. +// +// The AbortController fires the shared `shutdown` signal so any in-flight +// session.append() / getTranscriptionStream() resolves promptly with an +// AbortError instead of waiting for stop() to finish draining the queue. process.on('SIGINT', async () => { console.log('\n\nStopping...'); + shutdown.abort(); if (audioInput) { audioInput.quit(); } diff --git a/samples/python/live-audio-transcription/src/app.py b/samples/python/live-audio-transcription/src/app.py index 083ebbdf..b2344821 100644 --- a/samples/python/live-audio-transcription/src/app.py +++ b/samples/python/live-audio-transcription/src/app.py @@ -15,6 +15,8 @@ import time from foundry_local_sdk import Configuration, FoundryLocalManager +from foundry_local_sdk.exception import FoundryLocalException +from foundry_local_sdk.openai.live_audio_transcription_types import CoreErrorResponse use_synth = "--synth" in sys.argv @@ -41,8 +43,16 @@ model.load() print("done.") +# Graceful-shutdown coordinator. Set ONCE on the session via +# create_live_transcription_session(cancel_event=...) — every subsequent +# start() / append() / stop() / get_transcription_stream() call picks it +# up automatically, so we don't have to thread the event through every +# callsite. SIGINT just calls shutdown_event.set() and the in-flight +# session work unwinds cleanly. +shutdown_event = threading.Event() + audio_client = model.get_audio_client() -session = audio_client.create_live_transcription_session() +session = audio_client.create_live_transcription_session(cancel_event=shutdown_event) session.settings.sample_rate = 16000 session.settings.channels = 1 session.settings.language = "en" @@ -52,14 +62,30 @@ # --- Background thread reads transcription results (mirrors JS readPromise) --- + def read_results(): - for result in session.get_transcription_stream(): - text = result.content[0].text if result.content else "" - if result.is_final: - print() - print(f" [FINAL] {text}") - elif text: - print(text, end="", flush=True) + try: + for result in session.get_transcription_stream(): + text = result.content[0].text if result.content else "" + if result.is_final: + print() + print(f" [FINAL] {text}") + elif text: + print(text, end="", flush=True) + except FoundryLocalException as ex: + # Cancelled via shutdown_event -> generator returns cleanly (no exception). + # We only land here on a real native-side push failure. + # Use CoreErrorResponse to inspect structured error metadata (code + + # is_transient) and decide whether to retry or surface the error. + # Without it, the only signal would be str(ex). + info = CoreErrorResponse.try_parse(str(ex)) + if info and info.is_transient: + print(f"\n⚠ Transient ASR error ({info.code}): {info.message}. Continuing...") + return + if info: + print(f"\n✗ Stream error [{info.code}]: {info.message}") + return + print(f"\n✗ Stream error: {ex}") read_thread = threading.Thread(target=read_results, daemon=True) @@ -72,7 +98,6 @@ def read_results(): CHANNELS = 1 CHUNK = RATE // 10 # 100ms of audio = 1600 frames -stop_event = threading.Event() mic_active = False pa = None stream = None @@ -100,14 +125,21 @@ def read_results(): print() def capture_mic(): - while not stop_event.is_set(): + while not shutdown_event.is_set(): try: pcm_data = stream.read(CHUNK, exception_on_overflow=False) if pcm_data: + # Session-level cancel_event applies — if shutdown + # fires while append() is blocked on backpressure, + # it raises FoundryLocalException("cancelled") instead + # of waiting for the queue to drain. session.append(pcm_data) + except FoundryLocalException: + # Session was cancelled — exit the capture loop cleanly. + break except Exception as e: print(f"\n[ERROR] Microphone capture failed: {e}") - stop_event.set() + shutdown_event.set() break capture_thread = threading.Thread(target=capture_mic, daemon=True) @@ -148,9 +180,17 @@ def capture_mic(): # --- Graceful shutdown (mirrors JS SIGINT handler / C++ SignalHandler) --- + def shutdown(*_args): print("\n\nStopping...") - stop_event.set() + # Setting shutdown_event: + # - exits the mic capture loop on its next iteration + # - aborts any in-flight session.append() blocked on backpressure + # with FoundryLocalException("cancelled") + # - ends session.get_transcription_stream() iteration cleanly in + # the read thread + # - short-circuits session.stop()'s drain wait below + shutdown_event.set() if stream: stream.stop_stream() @@ -169,6 +209,6 @@ def shutdown(*_args): if mic_active: # Block until Ctrl+C - stop_event.wait() + shutdown_event.wait() else: shutdown() diff --git a/sdk/js/src/index.ts b/sdk/js/src/index.ts index bc27293b..b6b2f501 100644 --- a/sdk/js/src/index.ts +++ b/sdk/js/src/index.ts @@ -11,6 +11,7 @@ export { AudioClient, AudioClientSettings } from './openai/audioClient.js'; export { EmbeddingClient } from './openai/embeddingClient.js'; export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js'; export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js'; +export { LiveAudioStreamError } from './openai/liveAudioTranscriptionTypes.js'; export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js'; export { ModelLoadManager } from './detail/modelLoadManager.js'; /** @internal */ diff --git a/sdk/js/src/openai/audioClient.ts b/sdk/js/src/openai/audioClient.ts index 0e6b1f37..53da258a 100644 --- a/sdk/js/src/openai/audioClient.ts +++ b/sdk/js/src/openai/audioClient.ts @@ -59,10 +59,26 @@ export class AudioClient { /** * Creates a LiveAudioTranscriptionSession for real-time audio streaming ASR. + * + * @param signal - Optional AbortSignal applied to **all** subsequent + * ``start`` / ``append`` / ``stop`` / + * ``getTranscriptionStream`` calls on the returned session. + * Behaves like C#'s ``CancellationToken`` parameter. * @returns A LiveAudioTranscriptionSession instance. + * + * @example + * ```ts + * const shutdown = new AbortController(); + * const session = audioClient.createLiveTranscriptionSession(shutdown.signal); + * await session.start(); + * await session.append(pcm); + * for await (const r of session.getTranscriptionStream()) { ... } + * + * process.on('SIGINT', () => shutdown.abort()); + * ``` */ - public createLiveTranscriptionSession(): LiveAudioTranscriptionSession { - return new LiveAudioTranscriptionSession(this.modelId, this.coreInterop); + public createLiveTranscriptionSession(signal?: AbortSignal): LiveAudioTranscriptionSession { + return new LiveAudioTranscriptionSession(this.modelId, this.coreInterop, signal); } /** diff --git a/sdk/js/src/openai/liveAudioTranscriptionClient.ts b/sdk/js/src/openai/liveAudioTranscriptionClient.ts index b1115a25..88be122b 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionClient.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionClient.ts @@ -1,5 +1,5 @@ import { CoreInterop } from '../detail/coreInterop.js'; -import { LiveAudioTranscriptionResponse, parseTranscriptionResult, tryParseCoreError } from './liveAudioTranscriptionTypes.js'; +import { LiveAudioTranscriptionResponse, parseTranscriptionResult, wrapAsLiveAudioStreamError } from './liveAudioTranscriptionTypes.js'; /** * Audio format settings for a streaming session. @@ -30,6 +30,40 @@ export class LiveAudioTranscriptionOptions { } } +/** + * DOMException-compatible AbortError. Matches the shape thrown by native fetch/AbortController + * so callers can use `err.name === 'AbortError'` for cancellation detection. + * @internal + */ +function makeAbortError(message = 'The operation was aborted.'): Error { + const err = new Error(message); + err.name = 'AbortError'; + return err; +} + +/** + * Convert an AbortSignal's `reason` into a human-readable message. + * Handles all three cases: Error reasons, non-Error reasons (e.g., + * `controller.abort('timeout')`), and undefined reasons. + * @internal + */ +function abortMessage(signal: AbortSignal): string { + const reason = signal.reason; + if (reason instanceof Error) return reason.message; + if (reason !== undefined) return String(reason); + return 'The operation was aborted.'; +} + +/** + * If `signal` is already aborted, throw an AbortError immediately. + * @internal + */ +function throwIfAborted(signal: AbortSignal | undefined): void { + if (signal?.aborted) { + throw makeAbortError(abortMessage(signal)); + } +} + /** * Internal async queue that acts like C#'s Channel. * Supports a single consumer reading via async iteration and multiple producers writing. @@ -47,11 +81,21 @@ class AsyncQueue { this.maxCapacity = maxCapacity; } - /** Push an item. If at capacity, waits until space is available. */ - async write(item: T): Promise { + /** + * Push an item. If at capacity, waits until space is available. + * + * @param item - The value to enqueue. + * @param signal - Optional AbortSignal. If aborted while waiting on + * backpressure, the waiter is removed from the queue and + * an AbortError is thrown. The item is NOT enqueued. + */ + async write(item: T, signal?: AbortSignal): Promise { if (this.completed) { throw new Error('Cannot write to a completed queue.'); } + if (signal?.aborted) { + throw makeAbortError(abortMessage(signal)); + } if (this.waitingResolve) { const resolve = this.waitingResolve; @@ -61,14 +105,43 @@ class AsyncQueue { } while (this.queue.length >= this.maxCapacity) { - await new Promise((resolve) => { + // Make backpressure wait abort-aware: if the signal fires, remove + // our resolver from backpressureQueue so the chunk is never enqueued. + let waiterResolve!: () => void; + const waiter = new Promise((resolve) => { + waiterResolve = resolve; this.backpressureQueue.push(resolve); }); + + if (!signal) { + await waiter; + } else { + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((_, reject) => { + onAbort = () => reject(makeAbortError(abortMessage(signal))); + signal.addEventListener('abort', onAbort, { once: true }); + }); + try { + await Promise.race([waiter, abortPromise]); + } catch (err) { + // Aborted while backpressured — drop our resolver from the queue + // so we don't get woken up later and (worse) silently enqueue + // the item the caller already saw rejected. + const idx = this.backpressureQueue.indexOf(waiterResolve); + if (idx !== -1) this.backpressureQueue.splice(idx, 1); + throw err; + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } + } } if (this.completed) { throw new Error('Cannot write to a completed queue.'); } + if (signal?.aborted) { + throw makeAbortError(abortMessage(signal)); + } this.queue.push(item); } @@ -161,6 +234,8 @@ class AsyncQueue { export class LiveAudioTranscriptionSession { private modelId: string; private coreInterop: CoreInterop; + /** Session-level abort signal (applied to all operations). */ + private readonly sessionSignal: AbortSignal | undefined; private sessionHandle: string | null = null; private started = false; @@ -184,20 +259,28 @@ export class LiveAudioTranscriptionSession { * @internal * Users should create sessions via AudioClient.createLiveTranscriptionSession(). */ - constructor(modelId: string, coreInterop: CoreInterop) { + constructor(modelId: string, coreInterop: CoreInterop, signal?: AbortSignal) { this.modelId = modelId; this.coreInterop = coreInterop; + this.sessionSignal = signal; } /** * Start a real-time audio streaming session. * Must be called before append() or getTranscriptionStream(). * Settings are frozen after this call. + * + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession(signal)``. If that signal is + * already aborted, an AbortError is thrown and no native session is + * created. */ public async start(): Promise { if (this.started) { throw new Error('Streaming session already started. Call stop() first.'); } + const effectiveSignal = this.sessionSignal; + throwIfAborted(effectiveSignal); this.activeSettings = this.settings.snapshot(); this.outputQueue = new AsyncQueue(); @@ -225,10 +308,7 @@ export class LiveAudioTranscriptionSession { throw new Error('Native core did not return a session handle.'); } } catch (error) { - const err = new Error( - `Error starting audio stream session: ${error instanceof Error ? error.message : String(error)}`, - { cause: error } - ); + const err = wrapAsLiveAudioStreamError('Error starting audio stream session: ', error); this.outputQueue.complete(err); throw err; } @@ -237,25 +317,80 @@ export class LiveAudioTranscriptionSession { this.stopped = false; this.sessionAbortController = new AbortController(); + if (effectiveSignal) { + // throwIfAborted() at the top already handled pre-aborted signals + // and start() is synchronous through here, so signal cannot have + // fired between those two points. Just wire the listener. + // + // Use AbortSignal.any-style auto-removal: when our internal + // sessionAbortController fires (in stop()/handleExternalAbort), + // the listener is removed automatically. This avoids a memory + // leak where a long-lived caller signal kept the session + // instance alive via the closure capturing `this` after the + // session ended normally. + effectiveSignal.addEventListener('abort', () => this.handleExternalAbort(effectiveSignal), { + once: true, + signal: this.sessionAbortController.signal, + }); + } this.pushLoopPromise = this.pushLoop(); } + /** + * Handle an external AbortSignal firing while the session is active. + * Tears down the session by completing internal queues with an AbortError, + * and best-effort releases the native session handle. + * @internal + */ + private handleExternalAbort(signal: AbortSignal): void { + if (this.stopped || !this.started) return; + const err = makeAbortError(abortMessage(signal)); + this.stopped = true; + this.started = false; + this.sessionAbortController?.abort(); + this.pushQueue?.complete(err); + this.outputQueue?.complete(err); + + // Best-effort release of the native session handle. Without this the + // native core leaks a session per aborted client. + const handle = this.sessionHandle; + this.sessionHandle = null; + if (handle) { + try { + this.coreInterop.executeCommand("audio_stream_stop", { + Params: { SessionHandle: handle } + }); + } catch { + // Swallow: the session is already torn down on our side and + // we've surfaced the abort to the caller. + } + } + } + /** * Push a chunk of raw PCM audio data to the streaming session. * Can be called from any context. Chunks are internally queued * and serialized to native core one at a time. * + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession(signal)``. On abort, the + * chunk is NOT enqueued (no risk of late delivery to native core). + * * @param pcmData - Raw PCM audio bytes matching the configured format. */ public async append(pcmData: Uint8Array): Promise { if (!this.started || this.stopped) { throw new Error('No active streaming session. Call start() first.'); } + const effectiveSignal = this.sessionSignal; + throwIfAborted(effectiveSignal); const copy = new Uint8Array(pcmData.length); copy.set(pcmData); - await this.pushQueue!.write(copy); + // AsyncQueue.write is abort-aware: on abort, the backpressure waiter + // is removed and AbortError is thrown without enqueuing the chunk. + await this.pushQueue!.write(copy, effectiveSignal); } /** @@ -291,12 +426,9 @@ export class LiveAudioTranscriptionSession { } } catch (error) { const errorMsg = error instanceof Error ? error.message : String(error); - const errorInfo = tryParseCoreError(errorMsg); - - const fatalError = new Error( - `Push failed (code=${errorInfo?.code ?? 'UNKNOWN'}): ${errorMsg}`, - { cause: error } - ); + const fatalError = wrapAsLiveAudioStreamError(`Push failed: `, error); + // Preserve the previous "Push failed (code=...)" prefix in the message for log compatibility. + (fatalError as { message: string }).message = `Push failed (code=${fatalError.code}): ${errorMsg}`; this.stopped = true; this.started = false; this.pushQueue?.complete(fatalError); @@ -317,6 +449,10 @@ export class LiveAudioTranscriptionSession { * Get the async iterable of transcription results. * Results arrive as the native ASR engine processes audio data. * + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession(signal)``. On abort, iteration + * ends with an AbortError. + * * Usage: * ```ts * for await (const result of client.getTranscriptionStream()) { @@ -331,10 +467,29 @@ export class LiveAudioTranscriptionSession { if (this.streamConsumed) { throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.'); } + const effectiveSignal = this.sessionSignal; + // Check abort BEFORE marking the stream consumed so a pre-aborted + // signal doesn't permanently disable the (single-use) stream. + throwIfAborted(effectiveSignal); this.streamConsumed = true; - for await (const item of this.outputQueue) { - yield item; + // If a signal is provided, complete the output queue with an AbortError on abort + // so the pending iterator yield rejects promptly. + const queue = this.outputQueue; + let onAbort: (() => void) | null = null; + if (effectiveSignal) { + onAbort = () => queue.complete(makeAbortError(abortMessage(effectiveSignal))); + effectiveSignal.addEventListener('abort', onAbort, { once: true }); + } + + try { + for await (const item of queue) { + yield item; + } + } finally { + if (effectiveSignal && onAbort) { + effectiveSignal.removeEventListener('abort', onAbort); + } } } @@ -342,6 +497,10 @@ export class LiveAudioTranscriptionSession { * Signal end-of-audio and stop the streaming session. * Any remaining buffered audio in the push queue will be drained to native core first. * Final results are delivered through getTranscriptionStream() before it completes. + * + * Cancellation is configured once via the session-level signal passed + * to ``createLiveTranscriptionSession(signal)``. On abort, the drain + * is short-circuited and the native session is stopped immediately. */ public async stop(): Promise { if (!this.started || this.stopped) { @@ -352,13 +511,36 @@ export class LiveAudioTranscriptionSession { this.pushQueue?.complete(); + const effectiveSignal = this.sessionSignal; if (this.pushLoopPromise) { - await this.pushLoopPromise; + if (effectiveSignal) { + // Allow the caller to short-circuit the drain via abort. + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((resolve) => { + onAbort = () => { + this.sessionAbortController?.abort(); + resolve(); + }; + if (effectiveSignal.aborted) { + // addEventListener doesn't fire on already-aborted signals. + onAbort(); + } else { + effectiveSignal.addEventListener('abort', onAbort, { once: true }); + } + }); + try { + await Promise.race([this.pushLoopPromise, abortPromise]); + } finally { + if (onAbort && !effectiveSignal.aborted) effectiveSignal.removeEventListener('abort', onAbort); + } + } else { + await this.pushLoopPromise; + } } this.sessionAbortController?.abort(); - let stopError: Error | null = null; + let stopError: unknown = null; try { const responseData = this.coreInterop.executeCommand("audio_stream_stop", { Params: { SessionHandle: this.sessionHandle! } @@ -376,7 +558,7 @@ export class LiveAudioTranscriptionSession { } } } catch (error) { - stopError = error instanceof Error ? error : new Error(String(error)); + stopError = error; } this.sessionHandle = null; @@ -386,10 +568,7 @@ export class LiveAudioTranscriptionSession { this.outputQueue?.complete(); if (stopError) { - throw new Error( - `Error stopping audio stream session: ${stopError.message}`, - { cause: stopError } - ); + throw wrapAsLiveAudioStreamError('Error stopping audio stream session: ', stopError); } } diff --git a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts index d7f07b5b..799b681b 100644 --- a/sdk/js/src/openai/liveAudioTranscriptionTypes.ts +++ b/sdk/js/src/openai/liveAudioTranscriptionTypes.ts @@ -93,3 +93,38 @@ export function tryParseCoreError(errorString: string): CoreErrorResponse | null } return null; } + +/** + * Error thrown by live audio streaming operations when the native core reports a failure. + * Surfaces structured fields (code, isTransient) so callers can implement targeted retry + * or telemetry logic instead of string-matching on `message`. + * + * `code` is `'UNKNOWN'` when the underlying error is not a structured CoreErrorResponse. + */ +export class LiveAudioStreamError extends Error { + /** Machine-readable error code from the native core, or `'UNKNOWN'`. */ + public readonly code: string; + /** Whether the underlying core error is transient (caller may retry). */ + public readonly isTransient: boolean; + + constructor(message: string, code: string, isTransient: boolean, options?: { cause?: unknown }) { + super(message, options as ErrorOptions); + this.name = 'LiveAudioStreamError'; + this.code = code; + this.isTransient = isTransient; + } +} + +/** + * Wrap an arbitrary thrown value into a LiveAudioStreamError, parsing the underlying CoreErrorResponse + * if present. The resulting `message` keeps the existing prefix format for backwards + * compatibility with logs and troubleshooting docs. + * @internal + */ +export function wrapAsLiveAudioStreamError(prefix: string, cause: unknown): LiveAudioStreamError { + const causeMsg = cause instanceof Error ? cause.message : String(cause); + const info = tryParseCoreError(causeMsg); + const code = info?.code ?? 'UNKNOWN'; + const isTransient = info?.isTransient ?? false; + return new LiveAudioStreamError(`${prefix}${causeMsg}`, code, isTransient, { cause }); +} diff --git a/sdk/js/test/openai/liveAudioTranscription.test.ts b/sdk/js/test/openai/liveAudioTranscription.test.ts index 34edbac7..f8fc1da8 100644 --- a/sdk/js/test/openai/liveAudioTranscription.test.ts +++ b/sdk/js/test/openai/liveAudioTranscription.test.ts @@ -1,6 +1,6 @@ import { describe, it } from 'mocha'; import { expect } from 'chai'; -import { parseTranscriptionResult, tryParseCoreError } from '../../src/openai/liveAudioTranscriptionTypes.js'; +import { parseTranscriptionResult, tryParseCoreError, LiveAudioStreamError, wrapAsLiveAudioStreamError } from '../../src/openai/liveAudioTranscriptionTypes.js'; import { LiveAudioTranscriptionOptions } from '../../src/openai/liveAudioTranscriptionClient.js'; import { getTestManager } from '../testUtils.js'; @@ -116,6 +116,156 @@ describe('Live Audio Transcription Types', () => { }); }); + describe('LiveAudioStreamError', () => { + it('should expose code and isTransient when wrapping a structured error', () => { + const cause = new Error('Command \'audio_stream_push\' failed: {"code":"BUSY","message":"Model busy","isTransient":true}'); + const err = wrapAsLiveAudioStreamError('Push failed: ', cause); + + expect(err).to.be.instanceOf(LiveAudioStreamError); + expect(err.name).to.equal('LiveAudioStreamError'); + expect(err.code).to.equal('BUSY'); + expect(err.isTransient).to.be.true; + expect(err.cause).to.equal(cause); + expect(err.message).to.contain('Push failed: '); + }); + + it('should default code to UNKNOWN and isTransient to false for unstructured errors', () => { + const cause = new Error('something exploded'); + const err = wrapAsLiveAudioStreamError('Op failed: ', cause); + + expect(err).to.be.instanceOf(LiveAudioStreamError); + expect(err.code).to.equal('UNKNOWN'); + expect(err.isTransient).to.be.false; + }); + + it('should accept non-Error causes', () => { + const err = wrapAsLiveAudioStreamError('Op failed: ', 'string cause'); + expect(err.code).to.equal('UNKNOWN'); + expect(err.message).to.contain('string cause'); + }); + }); + + describe('AbortSignal helpers', () => { + // These tests exercise the behavior locked in by the abort-listener leak fix. + // We can't construct a real LiveAudioTranscriptionSession without the native + // core DLL, but we can verify that AbortSignal listeners are properly added + // and removed using the same pattern the client uses internally. + + it('should not leak listeners when racing a resolving promise against AbortSignal', async () => { + // Wrap a real AbortSignal so we can count add/remove calls. The + // previous version of this test used `signal.listenerCount('abort')` + // which doesn't exist on EventTarget — the assertion was a no-op. + const realController = new AbortController(); + let activeListeners = 0; + let peakListeners = 0; + const baseAdd = realController.signal.addEventListener.bind(realController.signal); + const baseRemove = realController.signal.removeEventListener.bind(realController.signal); + const tracked = new Set(); + + const signal = new Proxy(realController.signal, { + get(target, prop, receiver) { + if (prop === 'addEventListener') { + return (type: string, listener: EventListenerOrEventListenerObject, opts?: AddEventListenerOptions | boolean) => { + if (type === 'abort' && !tracked.has(listener)) { + tracked.add(listener); + activeListeners++; + if (activeListeners > peakListeners) peakListeners = activeListeners; + } + return baseAdd(type, listener, opts); + }; + } + if (prop === 'removeEventListener') { + return (type: string, listener: EventListenerOrEventListenerObject, opts?: EventListenerOptions | boolean) => { + if (type === 'abort' && tracked.has(listener)) { + tracked.delete(listener); + activeListeners--; + } + return baseRemove(type, listener, opts); + }; + } + return Reflect.get(target, prop, receiver); + }, + }) as AbortSignal; + + // Mimic the append() race pattern: register a listener, race, remove on settle. + for (let i = 0; i < 20; i++) { + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((_, reject) => { + onAbort = () => reject(new Error('aborted')); + signal.addEventListener('abort', onAbort, { once: true }); + }); + try { + await Promise.race([Promise.resolve(), abortPromise]); + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } + } + + // The fix MUST keep activeListeners bounded — never accumulating. + // Also assert the peak stayed at 1 (no overlap across iterations). + expect(activeListeners).to.equal(0, 'all abort listeners removed after each iteration'); + expect(peakListeners).to.equal(1, 'no more than one listener attached at a time'); + }); + + it('should propagate AbortError when signal is fired during race', async () => { + const controller = new AbortController(); + const signal = controller.signal; + + let onAbort: (() => void) | null = null; + const abortPromise = new Promise((_, reject) => { + onAbort = () => { + const err = new Error('The operation was aborted.'); + err.name = 'AbortError'; + reject(err); + }; + signal.addEventListener('abort', onAbort, { once: true }); + }); + + // Never-resolving "work" promise. + const work = new Promise(() => { /* never */ }); + const racePromise = Promise.race([work, abortPromise]); + + controller.abort(); + + try { + await racePromise; + expect.fail('expected AbortError'); + } catch (err) { + expect((err as Error).name).to.equal('AbortError'); + } finally { + if (onAbort) signal.removeEventListener('abort', onAbort); + } + }); + + it('should preserve non-Error abort reason in error message', () => { + // Mirrors the abortMessage() helper used internally by the session client. + // Non-Error reasons (e.g., controller.abort('timeout')) must be stringified + // rather than dropped. + const ctrl1 = new AbortController(); + ctrl1.abort('timeout'); + expect(typeof ctrl1.signal.reason).to.equal('string'); + + const ctrl2 = new AbortController(); + ctrl2.abort(new Error('boom')); + expect(ctrl2.signal.reason).to.be.instanceOf(Error); + + const ctrl3 = new AbortController(); + ctrl3.abort(); + expect(ctrl3.signal.reason).to.exist; // DOMException, not undefined + + // Verify the conversion logic produces a non-empty message in all cases. + const toMessage = (signal: AbortSignal): string => { + const r = signal.reason; + if (r instanceof Error) return r.message; + if (r !== undefined) return String(r); + return 'The operation was aborted.'; + }; + expect(toMessage(ctrl1.signal)).to.equal('timeout'); + expect(toMessage(ctrl2.signal)).to.equal('boom'); + expect(toMessage(ctrl3.signal)).to.be.a('string').and.not.empty; + }); + }); + // --- E2E streaming test with synthetic PCM audio --- describe('E2E with synthetic PCM audio', () => { diff --git a/sdk/python/src/openai/audio_client.py b/sdk/python/src/openai/audio_client.py index 575e9abf..cb13e013 100644 --- a/sdk/python/src/openai/audio_client.py +++ b/sdk/python/src/openai/audio_client.py @@ -62,24 +62,38 @@ def __init__(self, model_id: str, core_interop: CoreInterop): self.settings = AudioSettings() self._core_interop = core_interop - def create_live_transcription_session(self) -> LiveAudioTranscriptionSession: + def create_live_transcription_session( + self, + cancel_event: Optional[threading.Event] = None, + ) -> LiveAudioTranscriptionSession: """Create a real-time streaming transcription session. Audio data is pushed in as PCM chunks and transcription results are returned as a synchronous generator. + Args: + cancel_event: Optional ``threading.Event`` applied to **all** + subsequent ``start`` / ``append`` / ``stop`` / + ``get_transcription_stream`` calls on the returned session. + Set the event from any thread (e.g., a SIGINT handler) to + cancel in-flight operations and unblock the generator. + Pass it once here instead of threading it through every call. + Returns: A streaming session that should be stopped when done. Supports use as a context manager:: - with audio_client.create_live_transcription_session() as session: + cancel = threading.Event() + signal.signal(signal.SIGINT, lambda *_: cancel.set()) + + with audio_client.create_live_transcription_session(cancel) as session: session.settings.sample_rate = 16000 session.start() session.append(pcm_bytes) for result in session.get_transcription_stream(): print(result.content[0].text) """ - return LiveAudioTranscriptionSession(self.model_id, self._core_interop) + return LiveAudioTranscriptionSession(self.model_id, self._core_interop, cancel_event) @staticmethod def _validate_audio_file_path(audio_file_path: str) -> None: diff --git a/sdk/python/src/openai/live_audio_transcription_client.py b/sdk/python/src/openai/live_audio_transcription_client.py index 057c0770..387c0426 100644 --- a/sdk/python/src/openai/live_audio_transcription_client.py +++ b/sdk/python/src/openai/live_audio_transcription_client.py @@ -48,6 +48,19 @@ _SENTINEL = object() +# Polling interval for cancellation checks (seconds). +# +# ``queue.Queue.get`` / ``put`` cannot be interrupted by a ``threading.Event`` +# in standard Python, so when a ``cancel_event`` is configured we fall back +# to a poll-with-timeout pattern: wait up to this interval for queue I/O, +# then check the cancel flag and either return / raise or retry. +# +# 100 ms balances cancellation latency (a SIGINT takes effect within ~100 ms) +# against idle CPU overhead (~10 wakeups/sec per blocked call, negligible). +# This is a no-op on the fast path where no cancel_event is configured — +# the original blocking ``put()`` / ``get()`` is used unchanged. +_CANCEL_POLL_INTERVAL = 0.1 + class LiveAudioTranscriptionSession: """Session for real-time audio streaming ASR (Automatic Speech Recognition). @@ -82,10 +95,21 @@ class LiveAudioTranscriptionSession: session.stop() """ - def __init__(self, model_id: str, core_interop: CoreInterop): + def __init__( + self, + model_id: str, + core_interop: CoreInterop, + cancel_event: Optional[threading.Event] = None, + ): self._model_id = model_id self._core_interop = core_interop + # Session-level cancellation event. Set from any thread (e.g., a SIGINT + # handler) to cancel in-flight start/append/stop and unblock the + # transcription generator. Methods also accept an optional per-call + # cancel_event; setting EITHER will cancel. + self._cancel_event = cancel_event + # Public settings — mutable until start() self.settings = LiveAudioTranscriptionOptions() @@ -105,18 +129,30 @@ def __init__(self, model_id: str, core_interop: CoreInterop): self._push_queue: Optional[queue.Queue] = None self._push_thread: Optional[threading.Thread] = None + def _is_cancelled(self) -> bool: + """True if the session-level cancel_event is set.""" + return self._cancel_event is not None and self._cancel_event.is_set() + def start(self) -> None: """Start a real-time audio streaming session. Must be called before :meth:`append` or :meth:`get_transcription_stream`. Settings are frozen after this call. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + is already set, this raises :class:`FoundryLocalException` before + the native session is created. + Raises: FoundryLocalException: If the session is already started - (message contains ``"already started"``), or if the native + (message contains ``"already started"``), if the native core fails to start the stream (message has form - ``"Error starting audio stream session: "``). + ``"Error starting audio stream session: "``), + or if cancellation was requested before the call. """ + if self._is_cancelled(): + raise FoundryLocalException("start() cancelled before the session was created.") with self._lock: if self._started: raise FoundryLocalException( @@ -178,13 +214,20 @@ def append(self, pcm_data: bytes) -> None: (backpressure). This prevents unbounded memory growth when the native core falls behind real-time. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + fires while ``append`` is blocked on backpressure, the call returns + promptly via :class:`FoundryLocalException` **without enqueueing the + chunk** (no risk of late delivery to native core). + Args: pcm_data: Raw PCM audio bytes matching the configured format. Raises: FoundryLocalException: If the session is not active (not - started, or already stopped). Message contains - ``"No active streaming session"``. + started, or already stopped — message contains + ``"No active streaming session"``), or if the call was + cancelled before the chunk was enqueued. """ # Copy the data to avoid issues if the caller reuses the buffer data_copy = bytes(pcm_data) @@ -201,12 +244,24 @@ def append(self, pcm_data: bytes) -> None: "No active streaming session. Call start() first." ) - # put() blocks if the queue is full (backpressure). This prevents - # unbounded memory growth when the native core is slower than - # real-time. Capacity is configurable via push_queue_capacity. + # Fast-path: no cancellation event configured -> use the original + # blocking put() so we don't add per-call polling overhead. + if self._cancel_event is None: + push_queue.put(data_copy) + return + + # Cancellation-aware path: poll with a small timeout so we can + # surface a cancel set from another thread without enqueuing the chunk. # Performed outside the lock to avoid blocking stop() and other # state transitions while waiting for queue space. - push_queue.put(data_copy) + while True: + if self._is_cancelled(): + raise FoundryLocalException("append() cancelled before the chunk was enqueued.") + try: + push_queue.put(data_copy, timeout=_CANCEL_POLL_INTERVAL) + return + except queue.Full: + continue def get_transcription_stream( self, @@ -217,6 +272,11 @@ def get_transcription_stream( The generator completes when :meth:`stop` is called and all remaining audio has been processed. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + fires, iteration ends cleanly (the generator returns instead of + raising). + After :meth:`stop` completes, calling this method again returns an empty generator (the sentinel is still on the queue) — matching the C# / JS SDK behavior. @@ -240,8 +300,26 @@ def get_transcription_stream( "No active streaming session. Call start() first." ) + # Fast-path with no cancel source — use blocking get() unchanged. + if self._cancel_event is None: + while True: + item = q.get() + if item is _SENTINEL: + break + if isinstance(item, Exception): + raise item + yield item + return + + # Cancellation-aware path: poll periodically so we can return cleanly + # when the cancel event fires. while True: - item = q.get() + if self._is_cancelled(): + return + try: + item = q.get(timeout=_CANCEL_POLL_INTERVAL) + except queue.Empty: + continue if item is _SENTINEL: break if isinstance(item, Exception): @@ -255,6 +333,12 @@ def stop(self) -> None: native core first. Final results are delivered through :meth:`get_transcription_stream` before it completes. + Cancellation is configured once via the ``cancel_event`` passed to + :meth:`AudioClient.create_live_transcription_session`. If that event + fires while ``stop`` is waiting for the drain to finish, the wait + is short-circuited so ``stop`` returns promptly. The native session + is still finalized so resources are released. + Idempotent: calling ``stop()`` on a session that was never started or has already been stopped is a no-op. @@ -274,9 +358,16 @@ def stop(self) -> None: # 1. Signal push loop to finish (put sentinel) self._push_queue.put(_SENTINEL) - # 2. Wait for push loop to finish draining + # 2. Wait for push loop to finish draining. If a cancel is requested, + # poll with a short timeout so stop() can return promptly. if self._push_thread is not None: - self._push_thread.join() + if self._cancel_event is None: + self._push_thread.join() + else: + while self._push_thread.is_alive(): + if self._is_cancelled(): + break # short-circuit drain — proceed to native stop + self._push_thread.join(timeout=_CANCEL_POLL_INTERVAL) # 3. Tell native core to flush and finalize request = InteropRequest(params={"SessionHandle": self._session_handle}) diff --git a/sdk/python/test/openai/test_live_audio_transcription.py b/sdk/python/test/openai/test_live_audio_transcription.py index e5964158..ef5c8685 100644 --- a/sdk/python/test/openai/test_live_audio_transcription.py +++ b/sdk/python/test/openai/test_live_audio_transcription.py @@ -15,6 +15,7 @@ import json import threading +import time from unittest.mock import MagicMock import pytest @@ -237,6 +238,108 @@ def test_stop_without_start_is_noop(self): # Should not raise session.stop() + # --- Cancellation tests --- + + def test_start_with_pre_set_session_cancel_event_raises(self): + """Session-level cancel_event set before start() prevents native call.""" + cancel = threading.Event() + cancel.set() + + mock_interop = MagicMock(spec=CoreInterop) + session = LiveAudioTranscriptionSession("test-model", mock_interop, cancel) + + with pytest.raises(FoundryLocalException, match="cancelled"): + session.start() + + # Native start must NOT have been invoked. + mock_interop.start_audio_stream.assert_not_called() + + def test_session_level_cancel_unblocks_append_under_backpressure(self): + """Setting the session-level cancel_event unblocks a backpressured append().""" + cancel = threading.Event() + mock_interop = MagicMock(spec=CoreInterop) + mock_interop.start_audio_stream.return_value = Response(data="handle-1", error=None) + # Keep push_audio_data slow so the queue fills. + push_event = threading.Event() + push_in_progress = threading.Event() + + def slow_push(*_args, **_kwargs): + push_in_progress.set() + push_event.wait() + return Response(data=None, error=None) + + mock_interop.push_audio_data.side_effect = slow_push + mock_interop.stop_audio_stream.return_value = Response(data=None, error=None) + + session = LiveAudioTranscriptionSession("test-model", mock_interop, cancel) + session.settings.push_queue_capacity = 1 # tiny so we can fill quickly + session.start() + + try: + # First chunk: push thread takes it and blocks in slow_push. + session.append(b"\x00" * 100) + assert push_in_progress.wait(timeout=2.0), "push thread should pick up the first chunk" + + # Second chunk: fills the queue (capacity=1, push thread still busy). + session.append(b"\x00" * 100) + + blocked_result = {"err": None, "completed": False} + + def blocked_append(): + try: + session.append(b"\x00" * 100) + blocked_result["completed"] = True + except FoundryLocalException as e: + blocked_result["err"] = e + + t = threading.Thread(target=blocked_append, daemon=True) + t.start() + + # Give the thread time to actually start blocking on the queue. + t.join(timeout=0.3) + assert t.is_alive(), "third append() should be blocked on queue capacity" + + # Trigger session-level cancel — should unblock append() with an exception. + cancel.set() + t.join(timeout=2.0) + + assert not t.is_alive(), "append() should have unblocked after cancel" + assert blocked_result["err"] is not None, "append() should raise on cancel" + assert "cancelled" in str(blocked_result["err"]).lower() + assert blocked_result["completed"] is False, "chunk must NOT have been enqueued" + finally: + push_event.set() # unblock the slow_push so stop() can drain + session.stop() + + def test_get_transcription_stream_returns_cleanly_on_cancel(self): + """Cancel event ends the generator without raising.""" + cancel = threading.Event() + mock_interop = MagicMock(spec=CoreInterop) + mock_interop.start_audio_stream.return_value = Response(data="handle-1", error=None) + mock_interop.stop_audio_stream.return_value = Response(data=None, error=None) + + session = LiveAudioTranscriptionSession("test-model", mock_interop, cancel) + session.start() + + try: + results = [] + + def consume(): + for r in session.get_transcription_stream(): + results.append(r) + + t = threading.Thread(target=consume, daemon=True) + t.start() + time.sleep(0.2) # let it start blocking on the empty queue + assert t.is_alive(), "consumer should be blocked on empty queue" + + cancel.set() + t.join(timeout=2.0) + assert not t.is_alive(), "consumer should have returned cleanly on cancel" + assert results == [] # no results were ever produced + finally: + session.stop() + # --------------------------------------------------------------------------- # Session streaming integration test (mocked native core)