-
Notifications
You must be signed in to change notification settings - Fork 307
[JS & Python] Add cancellation and structured error handling to live audio streaming #690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 11 commits
0da8ab7
21194a3
1d340ab
83e8dc4
2cf7df8
df4260c
eee2cbe
42b40e5
6ab38c4
143a8b6
d5683c3
187810b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,7 @@ | |
| // | ||
| // Usage: node app.js | ||
|
|
||
| import { FoundryLocalManager } from 'foundry-local-sdk'; | ||
| import { FoundryLocalManager, CoreError } from 'foundry-local-sdk'; | ||
|
|
||
| console.log('╔══════════════════════════════════════════════════════════╗'); | ||
| console.log('║ Foundry Local — Live Audio Transcription (JS SDK) ║'); | ||
|
|
@@ -39,9 +39,16 @@ console.log('Loading model...'); | |
| await model.load(); | ||
| console.log('✓ Model loaded'); | ||
|
|
||
| // Graceful-shutdown coordinator. Set ONCE on the session via | ||
| // createLiveTranscriptionSession(signal) — every subsequent | ||
| // start() / append() / getTranscriptionStream() / stop() call picks it | ||
| // up automatically, so we don't have to thread the signal through every | ||
| // callsite. | ||
| const shutdown = new AbortController(); | ||
|
|
||
| // Create live transcription session (same pattern as C# sample). | ||
| const audioClient = model.createAudioClient(); | ||
| const session = audioClient.createLiveTranscriptionSession(); | ||
| const session = audioClient.createLiveTranscriptionSession(shutdown.signal); | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of creating a new class to represent the parameters passed into the session, can we instead make an optional parameter for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, I will update the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 6ab38c4. Dropped const shutdown = new AbortController();
const session = audioClient.createLiveTranscriptionSession(shutdown.signal);(was JS 19/19 tests pass; sample updated. |
||
| session.settings.sampleRate = 16000; // Default is 16000; shown here for clarity | ||
| session.settings.channels = 1; | ||
|
|
@@ -67,9 +74,22 @@ const readPromise = (async () => { | |
| } | ||
| } | ||
| } catch (err) { | ||
| if (err.name !== 'AbortError') { | ||
| console.error('Stream error:', err.message); | ||
| // AbortError is expected on Ctrl+C; ignore quietly. | ||
| if (err.name === 'AbortError') return; | ||
|
|
||
| // CoreError surfaces native-core failure metadata (code + isTransient). | ||
| // Use it to retry quietly on transient blips instead of dying on the | ||
| // first hiccup. Without CoreError the only signal would be err.message. | ||
| if (err instanceof CoreError) { | ||
| if (err.isTransient) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it expected for FoundryLocalCore to throw transient errors? What do transient errors mean? If we detect transient errors, and there is no action that the application/user should take during such errors, can we catch them before they reach the application layer and continue as necessary? |
||
| console.warn(`\n⚠ Transient ASR error (${err.code}): ${err.message}. Continuing...`); | ||
| return; | ||
| } | ||
| console.error(`\n✗ Stream error [${err.code}]: ${err.message}`); | ||
| return; | ||
| } | ||
|
|
||
| console.error('\n✗ Stream error:', err.message); | ||
| } | ||
| })(); | ||
|
|
||
|
|
@@ -108,14 +128,18 @@ try { | |
| try { | ||
| while (appendQueue.length > 0) { | ||
| const pcm = appendQueue.shift(); | ||
| // Session-level signal (set in createLiveTranscriptionSession) | ||
| // applies automatically — no need to pass it here. | ||
| await session.append(pcm); | ||
| } | ||
| } catch (err) { | ||
| // Aborted via Ctrl+C — exit quietly. | ||
| if (err.name === 'AbortError') return; | ||
| console.error('append error:', err.message); | ||
| } finally { | ||
| pumping = false; | ||
| // Handle race where new data arrived after loop exit. | ||
| if (appendQueue.length > 0) { | ||
| if (appendQueue.length > 0 && !shutdown.signal.aborted) { | ||
| void pumpAudio(); | ||
| } | ||
| } | ||
|
|
@@ -182,9 +206,14 @@ try { | |
| process.exit(0); | ||
| } | ||
|
|
||
| // Handle graceful shutdown | ||
| // Handle graceful shutdown. | ||
| // | ||
| // The AbortController fires the shared `shutdown` signal so any in-flight | ||
| // session.append() / getTranscriptionStream() resolves promptly with an | ||
| // AbortError instead of waiting for stop() to finish draining the queue. | ||
| process.on('SIGINT', async () => { | ||
| console.log('\n\nStopping...'); | ||
| shutdown.abort(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is my understanding correct that shutdown and session.stop do the same thing but shutdown is similar to session.stop(hard=true) so we avoid draining the queue? Should we use the same API to represent that, or do we need separate handlers for shutdown vs session.stop()?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They're not the same thing — they're orthogonal:
|
||
| if (audioInput) { | ||
| audioInput.quit(); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,8 @@ | |
| import time | ||
|
|
||
| from foundry_local_sdk import Configuration, FoundryLocalManager | ||
| from foundry_local_sdk.exception import FoundryLocalException | ||
| from foundry_local_sdk.openai.live_audio_transcription_types import CoreErrorResponse | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. Why not encapsulate the error response in FoundryLocalException? Do we need a CoreErrorResponse?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes — this breaks symmetry with the JS CoreError class. Right now Python users have to do: vs. JS: I will update it once we decide the name for |
||
|
|
||
| use_synth = "--synth" in sys.argv | ||
|
|
||
|
|
@@ -41,8 +43,16 @@ | |
| model.load() | ||
| print("done.") | ||
|
|
||
| # Graceful-shutdown coordinator. Set ONCE on the session via | ||
| # create_live_transcription_session(cancel_event=...) — every subsequent | ||
| # start() / append() / stop() / get_transcription_stream() call picks it | ||
| # up automatically, so we don't have to thread the event through every | ||
| # callsite. SIGINT just calls shutdown_event.set() and the in-flight | ||
| # session work unwinds cleanly. | ||
| shutdown_event = threading.Event() | ||
|
|
||
| audio_client = model.get_audio_client() | ||
| session = audio_client.create_live_transcription_session() | ||
| session = audio_client.create_live_transcription_session(cancel_event=shutdown_event) | ||
| session.settings.sample_rate = 16000 | ||
| session.settings.channels = 1 | ||
| session.settings.language = "en" | ||
|
|
@@ -52,14 +62,30 @@ | |
|
|
||
| # --- Background thread reads transcription results (mirrors JS readPromise) --- | ||
|
|
||
|
|
||
| def read_results(): | ||
| for result in session.get_transcription_stream(): | ||
| text = result.content[0].text if result.content else "" | ||
| if result.is_final: | ||
| print() | ||
| print(f" [FINAL] {text}") | ||
| elif text: | ||
| print(text, end="", flush=True) | ||
| try: | ||
| for result in session.get_transcription_stream(): | ||
| text = result.content[0].text if result.content else "" | ||
| if result.is_final: | ||
| print() | ||
| print(f" [FINAL] {text}") | ||
| elif text: | ||
| print(text, end="", flush=True) | ||
| except FoundryLocalException as ex: | ||
| # Cancelled via shutdown_event -> generator returns cleanly (no exception). | ||
| # We only land here on a real native-side push failure. | ||
| # Use CoreErrorResponse to inspect structured error metadata (code + | ||
| # is_transient) and decide whether to retry or surface the error. | ||
| # Without it, the only signal would be str(ex). | ||
| info = CoreErrorResponse.try_parse(str(ex)) | ||
| if info and info.is_transient: | ||
| print(f"\n⚠ Transient ASR error ({info.code}): {info.message}. Continuing...") | ||
| return | ||
| if info: | ||
| print(f"\n✗ Stream error [{info.code}]: {info.message}") | ||
| return | ||
| print(f"\n✗ Stream error: {ex}") | ||
|
|
||
|
|
||
| read_thread = threading.Thread(target=read_results, daemon=True) | ||
|
|
@@ -72,7 +98,6 @@ def read_results(): | |
| CHANNELS = 1 | ||
| CHUNK = RATE // 10 # 100ms of audio = 1600 frames | ||
|
|
||
| stop_event = threading.Event() | ||
| mic_active = False | ||
| pa = None | ||
| stream = None | ||
|
|
@@ -100,14 +125,21 @@ def read_results(): | |
| print() | ||
|
|
||
| def capture_mic(): | ||
| while not stop_event.is_set(): | ||
| while not shutdown_event.is_set(): | ||
| try: | ||
| pcm_data = stream.read(CHUNK, exception_on_overflow=False) | ||
| if pcm_data: | ||
| # Session-level cancel_event applies — if shutdown | ||
| # fires while append() is blocked on backpressure, | ||
| # it raises FoundryLocalException("cancelled") instead | ||
| # of waiting for the queue to drain. | ||
| session.append(pcm_data) | ||
| except FoundryLocalException: | ||
| # Session was cancelled — exit the capture loop cleanly. | ||
| break | ||
| except Exception as e: | ||
| print(f"\n[ERROR] Microphone capture failed: {e}") | ||
| stop_event.set() | ||
| shutdown_event.set() | ||
| break | ||
|
|
||
| capture_thread = threading.Thread(target=capture_mic, daemon=True) | ||
|
|
@@ -148,9 +180,17 @@ def capture_mic(): | |
|
|
||
| # --- Graceful shutdown (mirrors JS SIGINT handler / C++ SignalHandler) --- | ||
|
|
||
|
|
||
| def shutdown(*_args): | ||
| print("\n\nStopping...") | ||
| stop_event.set() | ||
| # Setting shutdown_event: | ||
| # - exits the mic capture loop on its next iteration | ||
| # - aborts any in-flight session.append() blocked on backpressure | ||
| # with FoundryLocalException("cancelled") | ||
| # - ends session.get_transcription_stream() iteration cleanly in | ||
| # the read thread | ||
| # - short-circuits session.stop()'s drain wait below | ||
| shutdown_event.set() | ||
|
|
||
| if stream: | ||
| stream.stop_stream() | ||
|
|
@@ -169,6 +209,6 @@ def shutdown(*_args): | |
|
|
||
| if mic_active: | ||
| # Block until Ctrl+C | ||
| stop_event.wait() | ||
| shutdown_event.wait() | ||
| else: | ||
| shutdown() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoreError seems to imply that the error is a generic error from the Core? Is that the case, or is the error specific to real time audio transcription?
If the latter, then I suggest we rename the error type? If the former, then we should address this in a bigger PR that throws a common error across all APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right — the name overpromises. Currently
CoreErroris only thrown by LiveAudioTranscriptionSession; chat, audio-filetranscription, and the Responses API all still throw plain Error. We can do this ?
Rename to scope-honest:
LiveAudioStreamError— keeps it where it lives, no false promises.