[draft] refactor: migrate ReadableStream to Chan<T> and AsyncIterable across codebase#1201
Conversation
…codebase - Add adapter utilities (fromReadableStream, toReadableStream, mergeAsyncIterables) - Migrate type signatures in voice/io.ts (STTNode, LLMNode, TTSNode) - Migrate voice/generation.ts (LLM/TTS inference, text/audio forwarding) - Migrate stt/stt.ts (SpeechStream async iteration) - Migrate tts/tts.ts (SynthesizeStream async iteration) - Migrate vad.ts (VADStream async iteration) - Migrate llm/realtime.ts (RealtimeSession) - Migrate audio_recognition.ts (DeferredReadableStream/StreamChannel to Chan) - Migrate audio.ts (audioFramesFromFile) - Migrate voice/agent_activity.ts (stream utilities) - Migrate voice/agent.ts (node methods) - Migrate voice/agent_session.ts (say method signature) - Migrate voice/room_io/_input.ts (createStream method) - Migrate voice/recorder_io/recorder_io.ts (StreamChannel to Chan) - Migrate voice/transcription/synchronizer.ts (IdentityTransform to Chan) - Migrate inference/interruption/* (TransformStream to async generators) - Migrate inference/stt.ts (StreamChannel to Chan) - Migrate inference/tts.ts (StreamChannel to Chan) - Migrate utils.ts (resampleStream) - Update test files to use async generators Co-Authored-By: brian.yin <brian.yin@livekit.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8057a55e89
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| private isInterruptionEnabled: boolean; | ||
| private isAgentSpeaking: boolean; | ||
| private interruptionStreamChannel?: StreamChannel<InterruptionSentinel | AudioFrame>; | ||
| private interruptionChan?: Chan<InterruptionSentinel | AudioFrame>; |
There was a problem hiding this comment.
Keep interruption task wired to the active channel
This refactor introduces interruptionChan, but createInterruptionTask() and disableInterruptionDetection() still read/write interruptionStreamChannel, so the interruption path is no longer wired to the field initialized in the constructor. As a result, interruption processing can be skipped entirely (the task bails before consuming overlap events), which breaks barge-in detection during agent speech.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
This was already fixed in commit be2627f — all references to interruptionStreamChannel were renamed to interruptionChan across the file (field declaration, createInterruptionTask(), and disableInterruptionDetection()). Verified with grep that zero references to interruptionStreamChannel remain in the codebase.
agents/src/voice/generation.ts
Outdated
| for await (const frame of ttsStream) { | ||
| if (signal.aborted) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
Restore idle timeout when reading TTS output
The loop now does a plain for await over ttsStream and no longer races reads against TTS_READ_IDLE_TIMEOUT_MS. If a provider yields some audio and then stalls without closing, this task can hang indefinitely and never reach normal completion semantics; the IdleTimeoutError catch is effectively dead in this path.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in commit be2627f — both performTTSInference and forwardAudio now wrap the TTS stream with withIdleTimeout(ttsStream, TTS_READ_IDLE_TIMEOUT_MS), restoring the per-read idle timeout. The IdleTimeoutError catch blocks are no longer dead code.
agents/src/voice/generation.ts
Outdated
| for await (const frame of ttsStream) { | ||
| if (signal?.aborted) { | ||
| break; | ||
| } |
There was a problem hiding this comment.
Reintroduce idle timeout in audio forwarding loop
Audio forwarding also switched from timeout-wrapped reads to a bare for await loop, so a stalled TTS iterable can block forever after playback has started. In that case performAudioForwarding() may never resolve and downstream turn cleanup waits indefinitely, despite the existing timeout-oriented error handling.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in commit be2627f — forwardAudio now uses withIdleTimeout(ttsStream, TTS_READ_IDLE_TIMEOUT_MS) to wrap the for-await loop, matching the timeout behavior of the old waitUntilTimeout(reader.read(), ms) pattern.
- Fix Tee<T> bracket indexing to use .get() method across all files - Change _pumpAbort from private to protected in AudioInput for subclass access - Fix interruptionStreamChannel -> interruptionChan property name mismatch - Migrate silero plugin VADStream from inputReader to for-await on inputChan - Add withIdleTimeout() adapter for AsyncIterable idle timeout support - Restore TTS_READ_IDLE_TIMEOUT_MS in performTTSInference and forwardAudio - Export withIdleTimeout from stream/index.ts Co-Authored-By: brian.yin <brian.yin@livekit.io>
| addInputStream(source: AsyncIterable<AudioFrame>): string { | ||
| const id = `input-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; | ||
| const abort = new AbortController(); | ||
| this._pumpAbort = abort; | ||
| (async () => { | ||
| try { | ||
| for await (const frame of source) { | ||
| if (abort.signal.aborted) break; | ||
| try { | ||
| this.inputChan.sendNowait(frame); | ||
| } catch (e) { | ||
| if (e instanceof ChanClosed) break; | ||
| throw e; | ||
| } | ||
| } | ||
| } catch { | ||
| // Source errors are silently consumed | ||
| } | ||
| })(); | ||
| return id; | ||
| } |
There was a problem hiding this comment.
🟡 AudioInput.addInputStream overwrites _pumpAbort without aborting the previous pump
When addInputStream is called multiple times, this._pumpAbort is overwritten by the new AbortController without first aborting the previous one. This leaves the old background pump running indefinitely, resulting in two concurrent pumps writing interleaved data into the same inputChan. Compare with all updateInputStream methods in STT (agents/src/stt/stt.ts:370), TTS (agents/src/tts/tts.ts:379), VAD (agents/src/vad.ts:172), and RealtimeSession (agents/src/llm/realtime.ts:168), which all correctly call this._pumpAbort?.abort() before creating a new pump. Current callers (e.g. ParticipantAudioInputStream) happen to abort externally via closeStream() before calling addInputStream, so this doesn't trigger today, but the method's contract is broken.
| addInputStream(source: AsyncIterable<AudioFrame>): string { | |
| const id = `input-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; | |
| const abort = new AbortController(); | |
| this._pumpAbort = abort; | |
| (async () => { | |
| try { | |
| for await (const frame of source) { | |
| if (abort.signal.aborted) break; | |
| try { | |
| this.inputChan.sendNowait(frame); | |
| } catch (e) { | |
| if (e instanceof ChanClosed) break; | |
| throw e; | |
| } | |
| } | |
| } catch { | |
| // Source errors are silently consumed | |
| } | |
| })(); | |
| return id; | |
| } | |
| addInputStream(source: AsyncIterable<AudioFrame>): string { | |
| const id = `input-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`; | |
| this._pumpAbort?.abort(); | |
| const abort = new AbortController(); | |
| this._pumpAbort = abort; | |
| (async () => { | |
| try { | |
| for await (const frame of source) { | |
| if (abort.signal.aborted) break; | |
| try { | |
| this.inputChan.sendNowait(frame); | |
| } catch (e) { | |
| if (e instanceof ChanClosed) break; | |
| throw e; | |
| } | |
| } | |
| } catch { | |
| // Source errors are silently consumed | |
| } | |
| })(); | |
| return id; | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Fixed in commit 6eb8b29 — addInputStream now calls this._pumpAbort?.abort() before creating the new AbortController, matching the pattern used in STT, TTS, VAD, and RealtimeSession's updateInputStream methods.
| updateInputStream(text: AsyncIterable<string>) { | ||
| this._pumpAbort?.abort(); | ||
| const abort = new AbortController(); | ||
| this._pumpAbort = abort; | ||
| (async () => { | ||
| try { | ||
| for await (const value of text) { | ||
| if (abort.signal.aborted) break; | ||
| try { | ||
| this.inputChan.sendNowait(value); | ||
| } catch (e) { | ||
| if (e instanceof ChanClosed) break; | ||
| throw e; | ||
| } | ||
| } | ||
| } catch { | ||
| // Source errors are silently consumed | ||
| } finally { | ||
| this.inputChan.close(); | ||
| } | ||
| })(); |
There was a problem hiding this comment.
🟡 TTS SynthesizeStream.updateInputStream closes shared inputChan, breaking re-entrant calls
In updateInputStream, the pump's finally block calls this.inputChan.close(). When the method is called a second time, the sequence is: (1) this._pumpAbort?.abort() aborts pump1, (2) pump2 starts writing to inputChan, (3) pump1's for-await eventually exits and its finally runs this.inputChan.close(), (4) pump2's subsequent sendNowait calls throw ChanClosed. This is inconsistent with the STT (agents/src/stt/stt.ts:369-387) and VAD (agents/src/vad.ts:171-189) implementations of updateInputStream, which do NOT close the channel in their pump's finally block. Currently, updateInputStream is only called once per stream instance (in agents/src/voice/agent.ts:461), so this doesn't trigger in practice.
Prompt for agents
The TTS SynthesizeStream.updateInputStream method closes this.inputChan in the pump's finally block (line 396). This is needed to signal pumpInput() that input is done, but it makes the method non-reentrant: a second call will have its pump broken by the first pump's finally closing the shared channel.
The fix requires restructuring how pumpInput knows input is complete. Options:
1. Remove the close() from finally and instead have pumpInput check for a separate 'done' signal.
2. Create a new inputChan in updateInputStream (like attachAudioInput does in agent_activity.ts:593-594) so each call gets a fresh channel. But this requires pumpInput to follow the new channel reference.
3. Accept single-call semantics and document it, matching the current usage pattern.
Relevant files: agents/src/tts/tts.ts (updateInputStream at line 378, pumpInput at line 277), agents/src/stt/stt.ts (updateInputStream at line 369 for comparison).
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
Fixed in commit 6eb8b29 — the finally block now only closes inputChan when the pump was NOT aborted (i.e., the source iterable exhausted naturally). If abort.signal.aborted is true (meaning updateInputStream was called again), the finally block skips the close so the new pump can continue using the shared channel. This makes the method safe for re-entrant calls while still signaling pumpInput when input is truly done.
… fix TTS updateInputStream re-entrancy Co-Authored-By: brian.yin <brian.yin@livekit.io>
Description
Migrates all internal
ReadableStream/TransformStream/WritableStreamusage across the agents-js codebase to useChan<T>(async channel queue) andAsyncIterable<T>, achieving Python parity. Builds on top of #1200 which introduced theChan<T>andtee()primitives.Changes Made
New: adapter utilities (
stream/adapters.ts)fromReadableStream<T>()— wraps aReadableStreamasAsyncIterable<T>(used at the boundary withAudioStreamfrom rtc-node)toReadableStream<T>()— wraps anAsyncIterable<T>back toReadableStream(for any external APIs that still require it)mergeAsyncIterables<T>()— merges N async iterables into one (replacement forMultiInputStream)withIdleTimeout<T>()— wraps anAsyncIterablewith per-.next()idle timeout, throwingIdleTimeoutErroron stall (replaces the oldwaitUntilTimeout(reader.read(), ms)pattern)Type signature changes
STTNode,LLMNode,TTSNodeinvoice/io.tsnow useAsyncIterable<T>instead ofReadableStream<T>SpeechStream,SynthesizeStream,VADStreamiteration now usesAsyncIterableRealtimeSessioniteration changed similarlyStreamChannel → Chan replacements
audio.ts(audioFramesFromFile)inference/stt.ts,inference/tts.tsvoice/audio_recognition.tsvoice/recorder_io/recorder_io.tsvoice/transcription/synchronizer.tsTransformStream → async generator composition
inference/interruption/http_transport.ts— newTransportFntype, returns an async generator instead of aTransformStreaminference/interruption/ws_transport.ts— background pump pattern withChan<OverlappingSpeechEvent>for WebSocket message routinginference/interruption/interruption_stream.ts— three-stagepipeThrough()pipeline replaced with composed async generators:eventEmit(transportFn(audioTransform(inputChan)))AudioInput base class refactor
MultiInputStreamreplaced withChan<AudioFrame>+ background pump +AbortControlleraddInputStream()aborts any previous pump before starting a new one, then pumps from the source iterable into the channelclose()aborts pump and closes channel_pumpAbortchanged fromprivatetoprotectedso subclasses (e.g.ParticipantAudioInputStream) can abort the pumpPlugin migration
VADStreamsubclass migrated fromthis.inputReader.read()(oldReadableStreamreader) tofor await...of this.inputChan(newChanpattern)Test updates
utils.test.ts,generation_tools.test.ts,generation_tts_timeout.test.ts,audio_recognition_span.test.ts— allReadableStreamtest helpers replaced with async generatorsUpdates since last revision
Addressed all review comments from Codex and Devin Review:
interruptionStreamChannel→interruptionChan(Codex P1): Verified fully migrated — zero references tointerruptionStreamChannelremain. The rename covers the field declaration,createInterruptionTask(), anddisableInterruptionDetection().Idle timeout restored in TTS loops (Codex P1 ×2, Devin Review): Both
performTTSInferenceandforwardAudionow usewithIdleTimeout(ttsStream, TTS_READ_IDLE_TIMEOUT_MS), restoring the per-read idle timeout that was accidentally dropped during migration.IdleTimeoutErrorcatch blocks are no longer dead code.AudioInput.addInputStreampump abort (Devin Review): Now callsthis._pumpAbort?.abort()before creating a newAbortController, matching the pattern in STT/TTS/VAD/RealtimeSessionupdateInputStreammethods.TTS SynthesizeStream.updateInputStreamre-entrancy (Devin Review): Thefinallyblock now conditionally closesinputChanonly when!abort.signal.aborted— i.e., only when the source iterable exhausted naturally. If the pump was aborted becauseupdateInputStreamwas called again, the channel stays open for the new pump.Backpressure: Many call sites changed from async
channel.write()to synchronouschan.sendNowait(). This removes backpressure — if a producer is faster than a consumer, items queue unbounded in the channel. Verify this is acceptable for each call site (audio frames, TTS events, etc.).ws_transport.tspump pattern (lines ~366-429): The new background pump +transportErrorvariable + channel close pattern is the most complex change. Error propagation through this indirection needs scrutiny.synchronizer.tssendNowait()calls (lines ~337, 374, 385): These are called withouttry/catchforChanClosed. If the channel closes whilemainTaskis still running, these could throw unhandled.withIdleTimeoutcleanup on stall: When the idle timeout fires,iter.return()is fire-and-forget because the source iterator may be stuck on a never-resolving promise. This means the stalled source generator won't be explicitly cleaned up — it relies on GC. Verify this is acceptable for TTS/audio streams.recorder_io.tscreateInterceptingStream: Error handling simplified to barecatch {}— all source errors are silently swallowed.Tee usage: All
tee()results now use.get(index)instead of bracket indexing (e.g.teed.get(0)notteed[0]). The.get()method throwsRangeErroron out-of-bounds — verify call sites always pass valid indices.TTS
updateInputStreamconditional close: TheinputChan.close()infinallynow checks!abort.signal.aborted. Verify the timing is safe — specifically thatabort.signal.abortedcannot become true between thefor awaitloop exiting normally and thefinallyblock executing.Pre-Review Checklist
pnpm buildpasses across all packages (agents + all plugins)Testing
pnpm lint) — lint errors are pre-existing on the base branchpnpm format:check)pnpm build) — all packages compile cleanlyrestaurant_agent.tsandrealtime_agent.tsverified — not yet testedAdditional Notes
ReadableStreamreturn types from node functionsIdentityTransform,StreamChannel,DeferredReadableStream,MultiInputStream) are still exported but no longer used internallyLink to Devin session: https://livekit.devinenterprise.com/sessions/6f09b4044c3e4950ad2673781e2f0ba9
Requested by: @toubatbrian