Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export { AudioClient, AudioClientSettings } from './openai/audioClient.js';
export { EmbeddingClient } from './openai/embeddingClient.js';
export { LiveAudioTranscriptionSession, LiveAudioTranscriptionOptions } from './openai/liveAudioTranscriptionClient.js';
export type { LiveAudioTranscriptionResponse, TranscriptionContentPart } from './openai/liveAudioTranscriptionTypes.js';
export { CoreError } from './openai/liveAudioTranscriptionTypes.js';
export { ResponsesClient, ResponsesClientSettings, getOutputText } from './openai/responsesClient.js';
export { ModelLoadManager } from './detail/modelLoadManager.js';
/** @internal */
Expand Down
222 changes: 194 additions & 28 deletions sdk/js/src/openai/liveAudioTranscriptionClient.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CoreInterop } from '../detail/coreInterop.js';
import { LiveAudioTranscriptionResponse, parseTranscriptionResult, tryParseCoreError } from './liveAudioTranscriptionTypes.js';
import { LiveAudioTranscriptionResponse, parseTranscriptionResult, wrapCoreError } from './liveAudioTranscriptionTypes.js';

/**
* Audio format settings for a streaming session.
Expand Down Expand Up @@ -30,6 +30,40 @@ export class LiveAudioTranscriptionOptions {
}
}

/**
* DOMException-compatible AbortError. Matches the shape thrown by native fetch/AbortController
* so callers can use `err.name === 'AbortError'` for cancellation detection.
* @internal
*/
function makeAbortError(message = 'The operation was aborted.'): Error {
const err = new Error(message);
err.name = 'AbortError';
return err;
}

/**
* Convert an AbortSignal's `reason` into a human-readable message.
* Handles all three cases: Error reasons, non-Error reasons (e.g.,
* `controller.abort('timeout')`), and undefined reasons.
* @internal
*/
function abortMessage(signal: AbortSignal): string {
const reason = signal.reason;
if (reason instanceof Error) return reason.message;
if (reason !== undefined) return String(reason);
return 'The operation was aborted.';
}

/**
* If `signal` is already aborted, throw an AbortError immediately.
* @internal
*/
function throwIfAborted(signal: AbortSignal | undefined): void {
if (signal?.aborted) {
throw makeAbortError(abortMessage(signal));
}
}

/**
* Internal async queue that acts like C#'s Channel<T>.
* Supports a single consumer reading via async iteration and multiple producers writing.
Expand All @@ -47,11 +81,21 @@ class AsyncQueue<T> {
this.maxCapacity = maxCapacity;
}

/** Push an item. If at capacity, waits until space is available. */
async write(item: T): Promise<void> {
/**
* Push an item. If at capacity, waits until space is available.
*
* @param item - The value to enqueue.
* @param signal - Optional AbortSignal. If aborted while waiting on
* backpressure, the waiter is removed from the queue and
* an AbortError is thrown. The item is NOT enqueued.
*/
async write(item: T, signal?: AbortSignal): Promise<void> {
if (this.completed) {
throw new Error('Cannot write to a completed queue.');
}
if (signal?.aborted) {
throw makeAbortError(abortMessage(signal));
}

if (this.waitingResolve) {
const resolve = this.waitingResolve;
Expand All @@ -61,14 +105,43 @@ class AsyncQueue<T> {
}

while (this.queue.length >= this.maxCapacity) {
await new Promise<void>((resolve) => {
// Make backpressure wait abort-aware: if the signal fires, remove
// our resolver from backpressureQueue so the chunk is never enqueued.
let waiterResolve!: () => void;
const waiter = new Promise<void>((resolve) => {
waiterResolve = resolve;
this.backpressureQueue.push(resolve);
});

if (!signal) {
await waiter;
} else {
let onAbort: (() => void) | null = null;
const abortPromise = new Promise<never>((_, reject) => {
onAbort = () => reject(makeAbortError(abortMessage(signal)));
signal.addEventListener('abort', onAbort, { once: true });
});
try {
await Promise.race([waiter, abortPromise]);
} catch (err) {
// Aborted while backpressured — drop our resolver from the queue
// so we don't get woken up later and (worse) silently enqueue
// the item the caller already saw rejected.
const idx = this.backpressureQueue.indexOf(waiterResolve);
if (idx !== -1) this.backpressureQueue.splice(idx, 1);
throw err;
} finally {
if (onAbort) signal.removeEventListener('abort', onAbort);
}
}
}

if (this.completed) {
throw new Error('Cannot write to a completed queue.');
}
if (signal?.aborted) {
throw makeAbortError(abortMessage(signal));
}

this.queue.push(item);
}
Expand Down Expand Up @@ -193,11 +266,14 @@ export class LiveAudioTranscriptionSession {
* Start a real-time audio streaming session.
* Must be called before append() or getTranscriptionStream().
* Settings are frozen after this call.
*
* @param signal - Optional AbortSignal. If aborted before or during start, an AbortError is thrown.
*/
public async start(): Promise<void> {
public async start(signal?: AbortSignal): Promise<void> {
if (this.started) {
throw new Error('Streaming session already started. Call stop() first.');
}
throwIfAborted(signal);

this.activeSettings = this.settings.snapshot();
this.outputQueue = new AsyncQueue<LiveAudioTranscriptionResponse>();
Expand Down Expand Up @@ -225,10 +301,7 @@ export class LiveAudioTranscriptionSession {
throw new Error('Native core did not return a session handle.');
}
} catch (error) {
const err = new Error(
`Error starting audio stream session: ${error instanceof Error ? error.message : String(error)}`,
{ cause: error }
);
const err = wrapCoreError('Error starting audio stream session: ', error);
this.outputQueue.complete(err);
throw err;
}
Expand All @@ -237,25 +310,79 @@ export class LiveAudioTranscriptionSession {
this.stopped = false;

this.sessionAbortController = new AbortController();
if (signal) {
const onAbort = () => this.handleExternalAbort(signal);
if (signal.aborted) {
onAbort();
} else {
// Use AbortSignal.any-style auto-removal: when our internal
// sessionAbortController fires (in stop()/handleExternalAbort),
// the listener is removed automatically. This avoids a memory
// leak where a long-lived caller signal kept the session
// instance alive via the closure capturing `this` after the
// session ended normally.
signal.addEventListener('abort', onAbort, {
once: true,
signal: this.sessionAbortController.signal,
});
}
}
this.pushLoopPromise = this.pushLoop();
}

/**
* Handle an external AbortSignal firing while the session is active.
* Tears down the session by completing internal queues with an AbortError,
* and best-effort releases the native session handle.
* @internal
*/
private handleExternalAbort(signal: AbortSignal): void {
if (this.stopped || !this.started) return;
const err = makeAbortError(abortMessage(signal));
this.stopped = true;
this.started = false;
this.sessionAbortController?.abort();
this.pushQueue?.complete(err);
this.outputQueue?.complete(err);

// Best-effort release of the native session handle. Without this the
// native core leaks a session per aborted client.
const handle = this.sessionHandle;
this.sessionHandle = null;
if (handle) {
try {
this.coreInterop.executeCommand("audio_stream_stop", {
Params: { SessionHandle: handle }
});
} catch {
// Swallow: the session is already torn down on our side and
// we've surfaced the abort to the caller.
}
}
}

/**
* Push a chunk of raw PCM audio data to the streaming session.
* Can be called from any context. Chunks are internally queued
* and serialized to native core one at a time.
*
* @param pcmData - Raw PCM audio bytes matching the configured format.
* @param signal - Optional AbortSignal. If aborted while waiting for queue
* capacity, an AbortError is thrown and the chunk is NOT
* enqueued (no risk of late delivery to native core).
*/
public async append(pcmData: Uint8Array): Promise<void> {
public async append(pcmData: Uint8Array, signal?: AbortSignal): Promise<void> {
if (!this.started || this.stopped) {
throw new Error('No active streaming session. Call start() first.');
}
throwIfAborted(signal);

const copy = new Uint8Array(pcmData.length);
copy.set(pcmData);

await this.pushQueue!.write(copy);
// AsyncQueue.write is abort-aware: on abort, the backpressure waiter
// is removed and AbortError is thrown without enqueuing the chunk.
await this.pushQueue!.write(copy, signal);
}

/**
Expand Down Expand Up @@ -291,12 +418,9 @@ export class LiveAudioTranscriptionSession {
}
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
const errorInfo = tryParseCoreError(errorMsg);

const fatalError = new Error(
`Push failed (code=${errorInfo?.code ?? 'UNKNOWN'}): ${errorMsg}`,
{ cause: error }
);
const fatalError = wrapCoreError(`Push failed: `, error);
// Preserve the previous "Push failed (code=...)" prefix in the message for log compatibility.
(fatalError as { message: string }).message = `Push failed (code=${fatalError.code}): ${errorMsg}`;
this.stopped = true;
this.started = false;
this.pushQueue?.complete(fatalError);
Expand All @@ -317,33 +441,56 @@ export class LiveAudioTranscriptionSession {
* Get the async iterable of transcription results.
* Results arrive as the native ASR engine processes audio data.
*
* @param signal - Optional AbortSignal. If aborted, iteration ends with an AbortError.
*
* Usage:
* ```ts
* for await (const result of client.getTranscriptionStream()) {
* console.log(result.content[0].text);
* }
* ```
*/
public async *getTranscriptionStream(): AsyncGenerator<LiveAudioTranscriptionResponse> {
public async *getTranscriptionStream(signal?: AbortSignal): AsyncGenerator<LiveAudioTranscriptionResponse> {
if (!this.outputQueue) {
throw new Error('No active streaming session. Call start() first.');
}
if (this.streamConsumed) {
throw new Error('getTranscriptionStream() can only be called once per session. The output stream has already been consumed.');
}
// Check abort BEFORE marking the stream consumed so a pre-aborted
// signal doesn't permanently disable the (single-use) stream.
throwIfAborted(signal);
this.streamConsumed = true;

for await (const item of this.outputQueue) {
yield item;
// If a signal is provided, complete the output queue with an AbortError on abort
// so the pending iterator yield rejects promptly.
const queue = this.outputQueue;
let onAbort: (() => void) | null = null;
if (signal) {
onAbort = () => queue.complete(makeAbortError(abortMessage(signal)));
signal.addEventListener('abort', onAbort, { once: true });
}

try {
for await (const item of queue) {
yield item;
}
} finally {
if (signal && onAbort) {
signal.removeEventListener('abort', onAbort);
}
}
}

/**
* Signal end-of-audio and stop the streaming session.
* Any remaining buffered audio in the push queue will be drained to native core first.
* Final results are delivered through getTranscriptionStream() before it completes.
*
* @param signal - Optional AbortSignal. If aborted while draining the push queue, drain is
* short-circuited and the native session is stopped immediately.
*/
public async stop(): Promise<void> {
public async stop(signal?: AbortSignal): Promise<void> {
if (!this.started || this.stopped) {
return;
}
Expand All @@ -353,12 +500,34 @@ export class LiveAudioTranscriptionSession {
this.pushQueue?.complete();

if (this.pushLoopPromise) {
await this.pushLoopPromise;
if (signal) {
// Allow the caller to short-circuit the drain via abort.
let onAbort: (() => void) | null = null;
const abortPromise = new Promise<void>((resolve) => {
onAbort = () => {
this.sessionAbortController?.abort();
resolve();
};
if (signal.aborted) {
// addEventListener doesn't fire on already-aborted signals.
onAbort();
} else {
signal.addEventListener('abort', onAbort, { once: true });
}
});
try {
await Promise.race([this.pushLoopPromise, abortPromise]);
} finally {
if (onAbort && !signal.aborted) signal.removeEventListener('abort', onAbort);
}
} else {
await this.pushLoopPromise;
}
}

this.sessionAbortController?.abort();

let stopError: Error | null = null;
let stopError: unknown = null;
try {
const responseData = this.coreInterop.executeCommand("audio_stream_stop", {
Params: { SessionHandle: this.sessionHandle! }
Expand All @@ -376,7 +545,7 @@ export class LiveAudioTranscriptionSession {
}
}
} catch (error) {
stopError = error instanceof Error ? error : new Error(String(error));
stopError = error;
}

this.sessionHandle = null;
Expand All @@ -386,10 +555,7 @@ export class LiveAudioTranscriptionSession {
this.outputQueue?.complete();

if (stopError) {
throw new Error(
`Error stopping audio stream session: ${stopError.message}`,
{ cause: stopError }
);
throw wrapCoreError('Error stopping audio stream session: ', stopError);
}
}

Expand Down
Loading
Loading