[draft] feat: implement Chan<T> and tee() primitives with Python parity#1200
[draft] feat: implement Chan<T> and tee() primitives with Python parity#1200toubatbrian wants to merge 2 commits intomainfrom
Conversation
Port Python's aio.Chan[T] and aio.itertools.tee to JS as foundation for replacing ReadableStream with async channel queues + async generators. Chan<T>: - Blocking/non-blocking send/recv with backpressure (maxsize) - Clean close semantics (drain buffer, wake all waiters) - Async iteration via for-await-of with auto-stop on close - Optional AbortSignal integration for iteration control - Resource leak prevention (waiter cleanup) tee(): - Split one AsyncIterable<T> into N independent iterators - Shared buffer with lock-based coordination - Error propagation across all peers - Upstream cleanup when last peer closes Includes 60 comprehensive unit tests covering: - Basic operations, backpressure, close semantics - AbortSignal support, resource leak prevention - Error propagation, partial consumption, nested tee Co-Authored-By: brian.yin <brian.yin@livekit.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 26f2744975
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
agents/src/stream/tee.ts
Outdated
| if (!started) { | ||
| try { | ||
| await iterator.next(); |
There was a problem hiding this comment.
Do not block close on iterator.next() for unstarted sources
When started is false, closeIterator waits on iterator.next() before calling return(). If the upstream generator blocks before its first yield (for example, it awaits external input), then closing the last tee peer (or calling Tee.aclose()) can hang forever on that next() call, which can stall shutdown and cancellation flows.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in c985a06. Removed the next() workaround entirely — closeIterator now just calls return() directly. For unstarted generators, return() completes immediately without running finally blocks. This matches Python's behavior (aclose() on an unstarted generator is a no-op) and is safe because no resources were acquired inside the generator body if it was never started. Added regression test should not hang when closing unstarted tee over a blocking source.
agents/src/stream/tee.ts
Outdated
|
|
||
| // Ensure upstream is closed even if peer cleanup didn't trigger it | ||
| try { | ||
| await closeIterator(this._iterator, true); |
There was a problem hiding this comment.
Pass the real started state in Tee.aclose fallback
Tee.aclose() always invokes closeIterator(this._iterator, true) in its fallback path, which skips the unstarted-generator workaround. In cases like tee(source, 0) where no child ever advances the iterator, this prevents the generator finally block from running during close, so upstream cleanup code may never execute.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Fixed in c985a06. Removed the _started tracking entirely — closeIterator no longer takes a started parameter. Tee.aclose() and _removeSelfAsync() now just call closeIterator(iterator) without any started state. The return() call handles both started and unstarted generators correctly without the workaround.
Address Codex review comments: - P1: closeIterator no longer calls next() on unstarted generators, which could block forever if the generator awaits external input before its first yield. - P2: Tee.aclose() no longer hardcodes started=true; the started tracking is removed entirely since closeIterator no longer needs it. This matches Python's behavior: aclose() on an unstarted generator is a no-op (finally blocks don't run). This is safe because no resources were acquired inside the generator body if it was never started. Co-Authored-By: brian.yin <brian.yin@livekit.io>
Description
Introduces two new async primitives —
Chan<T>andtee()— ported from Python'saio.Chan[T]andaio.itertools.tee. These are the foundation for replacingReadableStreamusage across the codebase with async channel queues and async generators (per the JS-ification design doc).This PR is additive only — no existing code is modified (aside from re-exports in
stream/index.ts). The broader migration across the codebase will follow in subsequent PRs.Changes Made
agents/src/stream/chan.ts—Chan<T>class: blocking/non-blocking send/recv, backpressure viamaxsize, close semantics with buffer draining, async iteration (for await...of), optionalAbortSignalsupport viaiter(signal?). Ref: Pythonlivekit-agents/livekit/agents/utils/aio/channel.py.agents/src/stream/tee.ts—Tee<T>class andtee()function: splits oneAsyncIterable<T>into N independent iterators with shared buffer, lock-based upstream coordination, error propagation, and upstream cleanup when the last peer closes. Ref: Pythonlivekit-agents/livekit/agents/utils/aio/itertools.py.agents/src/stream/chan.test.ts— 42 tests forChan<T>.agents/src/stream/tee.test.ts— 18 tests fortee().agents/src/stream/index.ts— Re-exports the new primitives.Pre-Review Checklist
pnpm format:checkpasses;pnpm linthas only pre-existing warnings unrelated to this PRTesting
Items for Reviewer Attention
Chan.iter()abort error handling (chan.ts:240-243): Whensignal?.abortedis true, any caught error is silently swallowed (not justAbortError). This matches the intent (stop iteration on abort), but could mask real errors that coincidentally occur at the same tick as an abort.closeIteratorworkaround for unstarted generators (tee.ts:29-34): JS async generators skiptry/finallyifreturn()is called beforenext(). The workaround callsnext()once to "start" the generator before callingreturn(). This is correct but could cause unexpected side effects if the upstream source does significant work on its first iteration.Lockin tee.ts: Simple mutex with no timeout or deadlock detection. Sufficient for the tee use case (short critical sections), but worth noting if this class is ever reused.Chan.close()getter ordering (chan.ts:185-196): When closing with more blocked receivers than buffered items, excess receivers are popped from the end (LIFO) and rejected, while the remaining receivers are woken FIFO. Verify this matches Python's behavior.Additional Notes
This is Phase 1 of the ReadableStream→Chan migration. Next steps:
fromReadableStream,toReadableStream) for interop with external APIsvoice/io.ts,voice/agent.ts,utils.tsLink to Devin session: https://livekit.devinenterprise.com/sessions/6f09b4044c3e4950ad2673781e2f0ba9
Requested by: @toubatbrian