diff --git a/agents/src/stream/chan.test.ts b/agents/src/stream/chan.test.ts new file mode 100644 index 000000000..764b8cb65 --- /dev/null +++ b/agents/src/stream/chan.test.ts @@ -0,0 +1,619 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { Chan, ChanClosed, ChanEmpty, ChanFull } from './chan.js'; + +describe('Chan', () => { + // ─── Basic send/recv ─── + + it('should send and receive a single value', async () => { + const ch = new Chan(); + ch.sendNowait('hello'); + const value = await ch.recv(); + expect(value).toBe('hello'); + }); + + it('should send and receive multiple values in order', async () => { + const ch = new Chan(); + ch.sendNowait(1); + ch.sendNowait(2); + ch.sendNowait(3); + + expect(await ch.recv()).toBe(1); + expect(await ch.recv()).toBe(2); + expect(await ch.recv()).toBe(3); + }); + + it('should handle null and undefined values', async () => { + const ch = new Chan(); + ch.sendNowait('test'); + ch.sendNowait(null); + ch.sendNowait(undefined); + ch.sendNowait('another'); + + expect(await ch.recv()).toBe('test'); + expect(await ch.recv()).toBeNull(); + expect(await ch.recv()).toBeUndefined(); + expect(await ch.recv()).toBe('another'); + }); + + it('should preserve object references', async () => { + const obj = { key: 'value' }; + const ch = new Chan(); + ch.sendNowait(obj); + const received = await ch.recv(); + expect(received).toBe(obj); + }); + + // ─── Non-blocking operations ─── + + it('should throw ChanEmpty on recvNowait from empty channel', () => { + const ch = new Chan(); + expect(() => ch.recvNowait()).toThrow(ChanEmpty); + }); + + it('should throw ChanClosed on sendNowait to closed channel', () => { + const ch = new Chan(); + ch.close(); + expect(() => ch.sendNowait('test')).toThrow(ChanClosed); + }); + + it('should throw ChanClosed on recvNowait from closed empty channel', () => { + const ch = new Chan(); + ch.close(); + expect(() => ch.recvNowait()).toThrow(ChanClosed); + }); + + it('should allow recvNowait from closed channel with buffered items', () => { + const ch = new Chan(); + ch.sendNowait('buffered'); + ch.close(); + expect(ch.recvNowait()).toBe('buffered'); + expect(() => ch.recvNowait()).toThrow(ChanClosed); + }); + + // ─── Blocking send/recv ─── + + it('should block recv until a value is sent', async () => { + const ch = new Chan(); + const recvPromise = ch.recv(); + + // Send after a microtask delay + await Promise.resolve(); + ch.sendNowait('delayed'); + + expect(await recvPromise).toBe('delayed'); + }); + + it('should handle concurrent send and recv', async () => { + const ch = new Chan(); + const results: string[] = []; + + const consumer = (async () => { + for await (const value of ch) { + results.push(value); + } + })(); + + const data = ['a', 'b', 'c', 'd', 'e']; + for (const item of data) { + ch.sendNowait(item); + } + ch.close(); + + await consumer; + expect(results).toEqual(data); + }); + + it('should wake blocked recv when value is sent', async () => { + const ch = new Chan(); + let received: number | undefined; + + const recvTask = (async () => { + received = await ch.recv(); + })(); + + // Let the recv() settle into waiting + await new Promise((r) => setTimeout(r, 10)); + expect(received).toBeUndefined(); + + ch.sendNowait(42); + await recvTask; + expect(received).toBe(42); + }); + + // ─── Backpressure (maxsize) ─── + + it('should report full when maxsize is reached', () => { + const ch = new Chan(2); + expect(ch.full()).toBe(false); + ch.sendNowait(1); + expect(ch.full()).toBe(false); + ch.sendNowait(2); + expect(ch.full()).toBe(true); + }); + + it('should throw ChanFull on sendNowait to full channel', () => { + const ch = new Chan(1); + ch.sendNowait(1); + expect(() => ch.sendNowait(2)).toThrow(ChanFull); + }); + + it('should block send on full channel until recv makes space', async () => { + const ch = new Chan(1); + ch.sendNowait(1); + + let sendCompleted = false; + const sendPromise = ch.send(2).then(() => { + sendCompleted = true; + }); + + // Send should be blocked + await new Promise((r) => setTimeout(r, 10)); + expect(sendCompleted).toBe(false); + + // Consuming one item should unblock the send + expect(await ch.recv()).toBe(1); + await sendPromise; + expect(sendCompleted).toBe(true); + expect(await ch.recv()).toBe(2); + }); + + it('should handle unbounded channel (maxsize=0) as never full', () => { + const ch = new Chan(); + for (let i = 0; i < 1000; i++) { + ch.sendNowait(i); + } + expect(ch.full()).toBe(false); + expect(ch.qsize()).toBe(1000); + }); + + it('should treat negative maxsize as unbounded', () => { + const ch = new Chan(-5); + for (let i = 0; i < 100; i++) { + ch.sendNowait(i); + } + expect(ch.full()).toBe(false); + }); + + // ─── Close semantics ─── + + it('should drain buffered items after close', async () => { + const ch = new Chan(); + ch.sendNowait(1); + ch.sendNowait(2); + ch.sendNowait(3); + ch.close(); + + const results: number[] = []; + for await (const value of ch) { + results.push(value); + } + expect(results).toEqual([1, 2, 3]); + }); + + it('should throw ChanClosed on send after close', async () => { + const ch = new Chan(); + ch.close(); + await expect(ch.send('test')).rejects.toThrow(ChanClosed); + }); + + it('should throw ChanClosed on recv from closed empty channel', async () => { + const ch = new Chan(); + ch.close(); + await expect(ch.recv()).rejects.toThrow(ChanClosed); + }); + + it('should handle double close without error', () => { + const ch = new Chan(); + ch.close(); + expect(() => ch.close()).not.toThrow(); + expect(ch.closed).toBe(true); + }); + + it('should wake all blocked receivers on close', async () => { + const ch = new Chan(); + + const recv1 = ch.recv().catch((e) => e); + const recv2 = ch.recv().catch((e) => e); + const recv3 = ch.recv().catch((e) => e); + + await new Promise((r) => setTimeout(r, 10)); + ch.close(); + + const [r1, r2, r3] = await Promise.all([recv1, recv2, recv3]); + expect(r1).toBeInstanceOf(ChanClosed); + expect(r2).toBeInstanceOf(ChanClosed); + expect(r3).toBeInstanceOf(ChanClosed); + }); + + it('should wake blocked senders with ChanClosed on close', async () => { + const ch = new Chan(1); + ch.sendNowait(1); // Fill the channel + + const sendResult = ch.send(2).catch((e) => e); + + await new Promise((r) => setTimeout(r, 10)); + ch.close(); + + const result = await sendResult; + expect(result).toBeInstanceOf(ChanClosed); + }); + + it('should satisfy some blocked receivers from buffer on close', async () => { + const ch = new Chan(); + + // Start 3 receivers waiting + const recv1 = ch.recv().catch((e) => (e instanceof ChanClosed ? 'closed' : 'error')); + const recv2 = ch.recv().catch((e) => (e instanceof ChanClosed ? 'closed' : 'error')); + const recv3 = ch.recv().catch((e) => (e instanceof ChanClosed ? 'closed' : 'error')); + + await new Promise((r) => setTimeout(r, 10)); + + // Send 2 items, then close — 2 receivers get values, 1 gets ChanClosed + ch.sendNowait(10); + ch.sendNowait(20); + ch.close(); + + const [r1, r2, r3] = await Promise.all([recv1, recv2, recv3]); + expect(r1).toBe(10); + expect(r2).toBe(20); + expect(r3).toBe('closed'); + }); + + // ─── Async iteration ─── + + it('should iterate all values then stop on close', async () => { + const ch = new Chan(); + const results: number[] = []; + + const consumer = (async () => { + for await (const value of ch) { + results.push(value); + } + })(); + + ch.sendNowait(1); + ch.sendNowait(2); + ch.sendNowait(3); + ch.close(); + + await consumer; + expect(results).toEqual([1, 2, 3]); + }); + + it('should iterate empty channel that is immediately closed', async () => { + const ch = new Chan(); + ch.close(); + + const results: number[] = []; + for await (const value of ch) { + results.push(value); + } + expect(results).toEqual([]); + }); + + it('should support multiple sequential iterations (separate iterators)', async () => { + const ch = new Chan(); + ch.sendNowait(1); + ch.sendNowait(2); + ch.close(); + + // First iterator gets everything + const results1: number[] = []; + for await (const value of ch) { + results1.push(value); + } + expect(results1).toEqual([1, 2]); + + // Second iterator on closed channel gets nothing + const results2: number[] = []; + for await (const value of ch) { + results2.push(value); + } + expect(results2).toEqual([]); + }); + + it('should handle slow consumer with fast producer', async () => { + const ch = new Chan(); + const results: number[] = []; + + const consumer = (async () => { + for await (const value of ch) { + await new Promise((r) => setTimeout(r, 1)); + results.push(value); + } + })(); + + for (let i = 0; i < 20; i++) { + ch.sendNowait(i); + } + ch.close(); + + await consumer; + expect(results).toEqual(Array.from({ length: 20 }, (_, i) => i)); + }); + + // ─── Abort signal support ─── + + it('should stop iteration when AbortSignal fires', async () => { + const ch = new Chan(); + const ac = new AbortController(); + const results: number[] = []; + + const consumer = (async () => { + for await (const value of ch.iter(ac.signal)) { + results.push(value); + if (results.length === 3) { + ac.abort(); + } + } + })(); + + for (let i = 0; i < 10; i++) { + ch.sendNowait(i); + } + + await consumer; + expect(results).toEqual([0, 1, 2]); + + // Channel is not closed — abort only stops iteration + expect(ch.closed).toBe(false); + ch.close(); + }); + + it('should handle pre-aborted signal', async () => { + const ch = new Chan(); + ch.sendNowait(1); + const ac = new AbortController(); + ac.abort(); + + const results: number[] = []; + for await (const value of ch.iter(ac.signal)) { + results.push(value); + } + expect(results).toEqual([]); + ch.close(); + }); + + it('should stop waiting recv when abort fires', async () => { + const ch = new Chan(); + const ac = new AbortController(); + const results: number[] = []; + + const consumer = (async () => { + for await (const value of ch.iter(ac.signal)) { + results.push(value); + } + })(); + + ch.sendNowait(1); + await new Promise((r) => setTimeout(r, 10)); + // Consumer should be waiting for next value + ac.abort(); + await consumer; + + expect(results).toEqual([1]); + expect(ch.closed).toBe(false); + ch.close(); + }); + + it('should clean up abort listener when channel closes normally', async () => { + const ch = new Chan(); + const ac = new AbortController(); + + ch.sendNowait(1); + ch.sendNowait(2); + ch.close(); + + const results: number[] = []; + for await (const value of ch.iter(ac.signal)) { + results.push(value); + } + expect(results).toEqual([1, 2]); + // No dangling listeners — abort should be safe to call now + ac.abort(); + }); + + // ─── Resource leak prevention ─── + + it('should not leak waiters after close', async () => { + const ch = new Chan(); + + // Start multiple blocked receivers + const promises = Array.from({ length: 5 }, () => ch.recv().catch(() => {})); + + await new Promise((r) => setTimeout(r, 10)); + ch.close(); + await Promise.all(promises); + + // Internal waiter arrays should be empty + expect((ch as any)._gets.length).toBe(0); + expect((ch as any)._puts.length).toBe(0); + }); + + it('should not leak waiters when backpressure resolves', async () => { + const ch = new Chan(1); + ch.sendNowait(1); + + const sendPromise = ch.send(2); + await new Promise((r) => setTimeout(r, 10)); + + // Consume to unblock + await ch.recv(); + await sendPromise; + + expect((ch as any)._puts.length).toBe(0); + ch.close(); + }); + + it('should not leak waiters when iteration completes', async () => { + const ch = new Chan(); + ch.sendNowait(1); + ch.sendNowait(2); + ch.close(); + + for await (const _ of ch) { + // consume + } + + expect((ch as any)._gets.length).toBe(0); + expect((ch as any)._puts.length).toBe(0); + }); + + it('should not leak waiters when abort fires during recv', async () => { + const ch = new Chan(); + const ac = new AbortController(); + + const consumer = (async () => { + const results: number[] = []; + for await (const value of ch.iter(ac.signal)) { + results.push(value); + } + return results; + })(); + + await new Promise((r) => setTimeout(r, 10)); + ac.abort(); + await consumer; + + // The abort listener should have cleaned up the waiter + expect((ch as any)._gets.length).toBe(0); + ch.close(); + }); + + it('should handle many concurrent readers and writers without leaks', async () => { + const ch = new Chan(5); + const received: number[] = []; + + // 10 concurrent writers + const writers = Array.from({ length: 10 }, (_, i) => + ch.send(i).catch(() => { + /* ChanClosed */ + }), + ); + + // 10 concurrent readers + const readers = Array.from({ length: 10 }, () => + ch + .recv() + .then((v) => received.push(v)) + .catch(() => { + /* ChanClosed */ + }), + ); + + await Promise.all([...writers, ...readers]); + ch.close(); + + expect((ch as any)._gets.length).toBe(0); + expect((ch as any)._puts.length).toBe(0); + }); + + // ─── qsize / empty / full / closed properties ─── + + it('should track qsize correctly', () => { + const ch = new Chan(); + expect(ch.qsize()).toBe(0); + expect(ch.empty()).toBe(true); + + ch.sendNowait(1); + expect(ch.qsize()).toBe(1); + expect(ch.empty()).toBe(false); + + ch.sendNowait(2); + expect(ch.qsize()).toBe(2); + + ch.recvNowait(); + expect(ch.qsize()).toBe(1); + + ch.recvNowait(); + expect(ch.qsize()).toBe(0); + expect(ch.empty()).toBe(true); + }); + + it('should report closed correctly', () => { + const ch = new Chan(); + expect(ch.closed).toBe(false); + ch.close(); + expect(ch.closed).toBe(true); + }); + + // ─── Edge cases ─── + + it('should handle rapid open/send/close cycles', async () => { + for (let i = 0; i < 100; i++) { + const ch = new Chan(); + ch.sendNowait(i); + ch.close(); + const results: number[] = []; + for await (const v of ch) { + results.push(v); + } + expect(results).toEqual([i]); + } + }); + + it('should handle large number of items', async () => { + const ch = new Chan(); + const count = 10_000; + + for (let i = 0; i < count; i++) { + ch.sendNowait(i); + } + ch.close(); + + const results: number[] = []; + for await (const value of ch) { + results.push(value); + } + expect(results.length).toBe(count); + expect(results[0]).toBe(0); + expect(results[count - 1]).toBe(count - 1); + }); + + it('should work with async send and recv interleaved', async () => { + const ch = new Chan(); + const results: number[] = []; + + const producer = (async () => { + for (let i = 0; i < 5; i++) { + await ch.send(i); + await new Promise((r) => setTimeout(r, 1)); + } + ch.close(); + })(); + + const consumer = (async () => { + for await (const value of ch) { + results.push(value); + } + })(); + + await Promise.all([producer, consumer]); + expect(results).toEqual([0, 1, 2, 3, 4]); + }); + + it('should handle backpressure with concurrent producer/consumer', async () => { + const ch = new Chan(3); + const results: number[] = []; + + const producer = (async () => { + for (let i = 0; i < 10; i++) { + await ch.send(i); + } + ch.close(); + })(); + + const consumer = (async () => { + for await (const value of ch) { + await new Promise((r) => setTimeout(r, 2)); + results.push(value); + } + })(); + + await Promise.all([producer, consumer]); + expect(results).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }); +}); diff --git a/agents/src/stream/chan.ts b/agents/src/stream/chan.ts new file mode 100644 index 000000000..a5456bc88 --- /dev/null +++ b/agents/src/stream/chan.ts @@ -0,0 +1,336 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +// Ref: python livekit-agents/livekit/agents/utils/aio/channel.py - 1-179 lines +// Based on asyncio.Queue, see https://github.com/python/cpython/blob/main/Lib/asyncio/queues.py + +/** + * Exception thrown when an operation is attempted on a closed channel. + */ +export class ChanClosed extends Error { + constructor() { + super('channel closed'); + this.name = 'ChanClosed'; + } +} + +/** + * Exception thrown when a non-blocking send is attempted on a full channel. + */ +export class ChanFull extends Error { + constructor() { + super('channel full'); + this.name = 'ChanFull'; + } +} + +/** + * Exception thrown when a non-blocking receive is attempted on an empty channel. + */ +export class ChanEmpty extends Error { + constructor() { + super('channel empty'); + this.name = 'ChanEmpty'; + } +} + +interface Waiter { + resolve: (value: T) => void; + reject: (err: Error) => void; + settled: boolean; +} + +/** + * An async channel (queue) modeled after Python's `aio.Chan[T]`. + * + * Supports: + * - Blocking and non-blocking send/recv + * - Backpressure via maxsize + * - Clean close semantics (wakes all waiters, drains remaining items) + * - Async iteration via `for await...of` (terminates on close) + * - Optional AbortSignal integration for iteration + * + * @example + * ```ts + * const ch = new Chan(); + * ch.sendNowait('hello'); + * ch.sendNowait('world'); + * ch.close(); + * + * for await (const msg of ch) { + * console.log(msg); // 'hello', 'world' + * } + * ``` + */ +export class Chan implements AsyncIterable { + private _closed = false; + private _queue: T[] = []; + private _gets: Waiter[] = []; + private _puts: Waiter[] = []; + private readonly _maxsize: number; + + constructor(maxsize: number = 0) { + this._maxsize = Math.max(maxsize, 0); + } + + private _wakeupNext(waiters: Waiter[]): void { + while (waiters.length > 0) { + const waiter = waiters.shift()!; + if (!waiter.settled) { + waiter.settled = true; + waiter.resolve(undefined); + return; + } + } + } + + /** + * Send a value into the channel, blocking if the channel is full. + * Throws {@link ChanClosed} if the channel is closed. + */ + async send(value: T): Promise { + while (this.full() && !this._closed) { + const waiter = this._createWaiter(); + this._puts.push(waiter); + try { + await waiter.promise; + } catch (e) { + if (e instanceof ChanClosed) throw e; + this._removeWaiter(waiter, this._puts); + if (!this.full() && !waiter.settled) { + this._wakeupNext(this._puts); + } + throw e; + } + } + this.sendNowait(value); + } + + /** + * Send a value into the channel without blocking. + * Throws {@link ChanClosed} if the channel is closed. + * Throws {@link ChanFull} if the channel buffer is full. + */ + sendNowait(value: T): void { + if (this._closed) { + throw new ChanClosed(); + } + if (this.full()) { + throw new ChanFull(); + } + this._queue.push(value); + this._wakeupNext(this._gets); + } + + /** + * Receive a value from the channel, blocking if the channel is empty. + * Throws {@link ChanClosed} if the channel is closed and empty. + */ + async recv(): Promise { + while (this.empty() && !this._closed) { + const waiter = this._createWaiter(); + this._gets.push(waiter); + try { + await waiter.promise; + } catch (e) { + if (e instanceof ChanClosed) throw e; + this._removeWaiter(waiter, this._gets); + if (!this.empty() && !waiter.settled) { + this._wakeupNext(this._gets); + } + throw e; + } + } + return this.recvNowait(); + } + + /** + * Receive a value from the channel without blocking. + * Throws {@link ChanClosed} if the channel is closed and empty. + * Throws {@link ChanEmpty} if the channel is empty but not closed. + */ + recvNowait(): T { + if (this.empty()) { + if (this._closed) { + throw new ChanClosed(); + } + throw new ChanEmpty(); + } + const item = this._queue.shift()!; + this._wakeupNext(this._puts); + return item; + } + + /** + * Close the channel. All blocked senders receive {@link ChanClosed}. + * Blocked receivers that can't be satisfied from the remaining buffer also receive {@link ChanClosed}. + * Remaining buffered items can still be drained via recv/recvNowait/iteration. + */ + close(): void { + if (this._closed) return; + this._closed = true; + + // Wake all putters with ChanClosed + for (const putter of this._puts) { + if (!putter.settled) { + putter.settled = true; + putter.reject(new ChanClosed()); + } + } + this._puts.length = 0; + + // For getters: wake those that can be satisfied from the buffer, + // reject the rest with ChanClosed + while (this._gets.length > this.qsize()) { + const getter = this._gets.pop()!; + if (!getter.settled) { + getter.settled = true; + getter.reject(new ChanClosed()); + } + } + + // Wake remaining getters (they'll read from the buffer) + while (this._gets.length > 0) { + this._wakeupNext(this._gets); + } + } + + /** Whether the channel has been closed. */ + get closed(): boolean { + return this._closed; + } + + /** The number of items currently buffered in the channel. */ + qsize(): number { + return this._queue.length; + } + + /** + * Whether the channel buffer is full. + * An unbounded channel (maxsize=0) is never full. + */ + full(): boolean { + if (this._maxsize <= 0) return false; + return this._queue.length >= this._maxsize; + } + + /** Whether the channel buffer is empty. */ + empty(): boolean { + return this._queue.length === 0; + } + + /** + * Iterate over the channel's values. The iterator terminates when the + * channel is closed and all buffered items have been consumed. + * + * @param signal - Optional AbortSignal to stop iteration early. + */ + async *iter(signal?: AbortSignal): AsyncGenerator { + while (true) { + if (signal?.aborted) return; + try { + // If an AbortSignal is provided, race recv() against the signal + if (signal) { + const value = await this._recvWithSignal(signal); + yield value; + } else { + yield await this.recv(); + } + } catch (e) { + if (e instanceof ChanClosed) return; + // Treat abort errors as a clean stop (not an exception to propagate) + if (signal?.aborted) return; + throw e; + } + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return this.iter(); + } + + /** + * Race a recv() against an AbortSignal. If the signal fires first, + * return without yielding (the caller should check and exit). + */ + private _recvWithSignal(signal: AbortSignal): Promise { + return new Promise((resolve, reject) => { + // Check if already aborted + if (signal.aborted) { + reject(signal.reason ?? new Error('aborted')); + return; + } + + // Try non-blocking recv first + if (!this.empty()) { + resolve(this.recvNowait()); + return; + } + + if (this._closed) { + reject(new ChanClosed()); + return; + } + + let settled = false; + + const onAbort = () => { + if (settled) return; + settled = true; + // Remove the waiter we added + const idx = this._gets.findIndex((w) => w.resolve === onWake); + if (idx !== -1) { + this._gets[idx]!.settled = true; + this._gets.splice(idx, 1); + } + reject(signal.reason ?? new Error('aborted')); + }; + + const onWake = () => { + if (settled) return; + settled = true; + signal.removeEventListener('abort', onAbort); + // Now try to read from the buffer + try { + resolve(this.recvNowait()); + } catch (e) { + reject(e); + } + }; + + const onReject = (err: Error) => { + if (settled) return; + settled = true; + signal.removeEventListener('abort', onAbort); + reject(err); + }; + + signal.addEventListener('abort', onAbort, { once: true }); + + const waiter: Waiter = { + resolve: onWake, + reject: onReject, + settled: false, + }; + this._gets.push(waiter); + }); + } + + private _createWaiter(): Waiter & { promise: Promise } { + let resolve: (value: undefined) => void; + let reject: (err: Error) => void; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { resolve: resolve!, reject: reject!, settled: false, promise }; + } + + private _removeWaiter(waiter: Waiter, waiters: Waiter[]): void { + const idx = waiters.indexOf(waiter); + if (idx !== -1) { + waiters.splice(idx, 1); + } + } +} diff --git a/agents/src/stream/index.ts b/agents/src/stream/index.ts index 26b3f97b3..076764fb3 100644 --- a/agents/src/stream/index.ts +++ b/agents/src/stream/index.ts @@ -1,8 +1,10 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +export { Chan, ChanClosed, ChanEmpty, ChanFull } from './chan.js'; export { DeferredReadableStream } from './deferred_stream.js'; export { IdentityTransform } from './identity_transform.js'; export { mergeReadableStreams } from './merge_readable_streams.js'; export { MultiInputStream } from './multi_input_stream.js'; export { createStreamChannel, type StreamChannel } from './stream_channel.js'; +export { Tee, tee } from './tee.js'; diff --git a/agents/src/stream/tee.test.ts b/agents/src/stream/tee.test.ts new file mode 100644 index 000000000..43666f82e --- /dev/null +++ b/agents/src/stream/tee.test.ts @@ -0,0 +1,379 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; +import { Chan } from './chan.js'; +import { Tee, tee } from './tee.js'; + +/** Helper: create an async iterable from an array */ +async function* fromArray(items: T[]): AsyncGenerator { + for (const item of items) { + yield item; + } +} + +/** Helper: collect all values from an async iterable */ +async function collect(iter: AsyncIterable): Promise { + const results: T[] = []; + for await (const item of iter) { + results.push(item); + } + return results; +} + +describe('tee', () => { + // ─── Basic functionality ─── + + it('should tee an async iterable into 2 copies by default', async () => { + const source = fromArray([1, 2, 3]); + const [a, b] = tee(source); + + const resultsA = await collect(a); + const resultsB = await collect(b); + + expect(resultsA).toEqual([1, 2, 3]); + expect(resultsB).toEqual([1, 2, 3]); + }); + + it('should tee into N copies', async () => { + const source = fromArray(['x', 'y', 'z']); + const t = tee(source, 4); + + expect(t.length).toBe(4); + + const results = await Promise.all(t.toArray().map(collect)); + for (const r of results) { + expect(r).toEqual(['x', 'y', 'z']); + } + }); + + it('should tee into 1 copy', async () => { + const source = fromArray([10, 20]); + const t = tee(source, 1); + + expect(t.length).toBe(1); + expect(await collect(t.get(0))).toEqual([10, 20]); + }); + + it('should handle empty source', async () => { + const source = fromArray([]); + const [a, b] = tee(source); + + expect(await collect(a)).toEqual([]); + expect(await collect(b)).toEqual([]); + }); + + // ─── Ordering and interleaving ─── + + it('should yield values in order when consumers read at different rates', async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const [a, b] = tee(source); + + // Read from A first, then B + const resultsA = await collect(a); + const resultsB = await collect(b); + + expect(resultsA).toEqual([1, 2, 3, 4, 5]); + expect(resultsB).toEqual([1, 2, 3, 4, 5]); + }); + + it('should support interleaved reads between peers', async () => { + const source = fromArray([1, 2, 3]); + const [a, b] = tee(source); + + const iterA = a[Symbol.asyncIterator](); + const iterB = b[Symbol.asyncIterator](); + + // Interleave reads + expect((await iterA.next()).value).toBe(1); + expect((await iterB.next()).value).toBe(1); + expect((await iterA.next()).value).toBe(2); + expect((await iterA.next()).value).toBe(3); + expect((await iterB.next()).value).toBe(2); + expect((await iterB.next()).value).toBe(3); + + expect((await iterA.next()).done).toBe(true); + expect((await iterB.next()).done).toBe(true); + }); + + // ─── Chan as source ─── + + it('should work with Chan as source', async () => { + const ch = new Chan(); + const [a, b] = tee(ch); + + const resultsA: number[] = []; + const resultsB: number[] = []; + + const consumerA = (async () => { + for await (const v of a) resultsA.push(v); + })(); + const consumerB = (async () => { + for await (const v of b) resultsB.push(v); + })(); + + ch.sendNowait(10); + ch.sendNowait(20); + ch.sendNowait(30); + ch.close(); + + await Promise.all([consumerA, consumerB]); + + expect(resultsA).toEqual([10, 20, 30]); + expect(resultsB).toEqual([10, 20, 30]); + }); + + it('should work with Chan and concurrent producer', async () => { + const ch = new Chan(); + const [a, b] = tee(ch); + + const resultsA: number[] = []; + const resultsB: number[] = []; + + const consumerA = (async () => { + for await (const v of a) resultsA.push(v); + })(); + const consumerB = (async () => { + for await (const v of b) resultsB.push(v); + })(); + + const producer = (async () => { + for (let i = 0; i < 10; i++) { + await ch.send(i); + await new Promise((r) => setTimeout(r, 1)); + } + ch.close(); + })(); + + await Promise.all([producer, consumerA, consumerB]); + + expect(resultsA).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + expect(resultsB).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); + }); + + // ─── Error propagation ─── + + it('should propagate errors from source to all peers', async () => { + const error = new Error('upstream failure'); + async function* failingSource(): AsyncGenerator { + yield 1; + yield 2; + throw error; + } + + const [a, b] = tee(failingSource()); + + const iterA = a[Symbol.asyncIterator](); + const iterB = b[Symbol.asyncIterator](); + + // Both peers get the first two values + expect((await iterA.next()).value).toBe(1); + expect((await iterB.next()).value).toBe(1); + expect((await iterA.next()).value).toBe(2); + expect((await iterB.next()).value).toBe(2); + + // Both peers should see the error + await expect(iterA.next()).rejects.toThrow('upstream failure'); + await expect(iterB.next()).rejects.toThrow('upstream failure'); + }); + + it('should propagate errors to peers that have not yet reached the error', async () => { + const error = new Error('boom'); + async function* failingSource(): AsyncGenerator { + yield 1; + throw error; + } + + const [a, b] = tee(failingSource()); + + // A reads ahead and hits the error + const iterA = a[Symbol.asyncIterator](); + expect((await iterA.next()).value).toBe(1); + await expect(iterA.next()).rejects.toThrow('boom'); + + // B should also see the error after consuming buffered items + const iterB = b[Symbol.asyncIterator](); + expect((await iterB.next()).value).toBe(1); + await expect(iterB.next()).rejects.toThrow('boom'); + }); + + // ─── Partial consumption ─── + + it('should handle one peer consuming all while another consumes partially', async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const [a, b] = tee(source); + + // A reads everything + const resultsA = await collect(a); + expect(resultsA).toEqual([1, 2, 3, 4, 5]); + + // B only reads 2 items then stops + const iterB = b[Symbol.asyncIterator](); + expect((await iterB.next()).value).toBe(1); + expect((await iterB.next()).value).toBe(2); + // Abandon B — should not hang or leak + await iterB.return!(undefined); + }); + + it('should close upstream when last peer is closed after reading', async () => { + let upstreamClosed = false; + async function* tracked(): AsyncGenerator { + try { + yield 1; + yield 2; + yield 3; + } finally { + upstreamClosed = true; + } + } + + const t = tee(tracked(), 2); + const [a, b] = t.toArray(); + + // Advance at least one peer so the generator is started + await a.next(); + + // Close both peers + await a.return(undefined); + expect(upstreamClosed).toBe(false); // Still one peer left + await b.return(undefined); + expect(upstreamClosed).toBe(true); // Last peer closed upstream + }); + + it('should close upstream without hanging when no peer ever read', async () => { + let upstreamClosed = false; + async function* tracked(): AsyncGenerator { + try { + yield 1; + } finally { + upstreamClosed = true; + } + } + + const t = tee(tracked(), 2); + const [a, b] = t.toArray(); + + // Close both peers without ever reading — should not hang. + // Python also doesn't run finally for unstarted generators. + await a.return(undefined); + await b.return(undefined); + expect(upstreamClosed).toBe(false); + }); + + // ─── aclose ─── + + it('should close all children and upstream via aclose after reading', async () => { + let upstreamClosed = false; + async function* tracked(): AsyncGenerator { + try { + yield 1; + yield 2; + } finally { + upstreamClosed = true; + } + } + + const t = new Tee(tracked(), 3); + // Start the generator by reading from one child + await t.get(0).next(); + await t.aclose(); + expect(upstreamClosed).toBe(true); + }); + + it('should close via aclose without hanging when no child ever read', async () => { + let upstreamClosed = false; + async function* tracked(): AsyncGenerator { + try { + yield 1; + } finally { + upstreamClosed = true; + } + } + + const t = new Tee(tracked(), 3); + // No child reads — aclose should not hang + await t.aclose(); + // Python doesn't run finally for unstarted generators either + expect(upstreamClosed).toBe(false); + }); + + // ─── Indexing and iteration ─── + + it('should support get() for index access', () => { + const source = fromArray([1]); + const t = tee(source, 3); + + expect(t.get(0)).toBeDefined(); + expect(t.get(1)).toBeDefined(); + expect(t.get(2)).toBeDefined(); + expect(() => t.get(3)).toThrow(RangeError); + expect(() => t.get(-1)).toThrow(RangeError); + }); + + it('should support Symbol.iterator for destructuring', () => { + const source = fromArray([1]); + const t = tee(source, 3); + + const children = [...t]; + expect(children.length).toBe(3); + }); + + // ─── Resource leak prevention ─── + + it('should not leak buffers after all peers complete', async () => { + const source = fromArray([1, 2, 3]); + const t = tee(source, 3); + const children = t.toArray(); + + await Promise.all(children.map(collect)); + + // Internal buffers should be cleaned up (peers array emptied) + expect((t as any)._buffers.length).toBe(0); + }); + + it('should handle large fan-out without issues', async () => { + const source = fromArray([1, 2, 3]); + const t = tee(source, 20); + + const results = await Promise.all(t.toArray().map(collect)); + for (const r of results) { + expect(r).toEqual([1, 2, 3]); + } + }); + + it('should not hang when closing unstarted tee over a blocking source', async () => { + // P1 regression: if upstream blocks before its first yield, + // closeIterator must not hang forever on iterator.next(). + async function* blockingSource(): AsyncGenerator { + await new Promise(() => { + // never resolves — simulates an indefinitely blocking source + }); + yield 1; + } + + const t = new Tee(blockingSource(), 2); + + // Close without ever reading — this should return quickly, not hang + const closePromise = t.aclose(); + const timeout = new Promise((r) => setTimeout(() => r('timeout'), 1000)); + const result = await Promise.race([closePromise.then(() => 'closed'), timeout]); + + expect(result).toBe('closed'); + }); + + it('should handle tee of tee', async () => { + const source = fromArray([1, 2, 3]); + const [a, b] = tee(source); + + // Tee one of the children again + const [a1, a2] = tee(a); + + const resultsA1 = await collect(a1); + const resultsA2 = await collect(a2); + const resultsB = await collect(b); + + expect(resultsA1).toEqual([1, 2, 3]); + expect(resultsA2).toEqual([1, 2, 3]); + expect(resultsB).toEqual([1, 2, 3]); + }); +}); diff --git a/agents/src/stream/tee.ts b/agents/src/stream/tee.ts new file mode 100644 index 000000000..115a9a6b7 --- /dev/null +++ b/agents/src/stream/tee.ts @@ -0,0 +1,274 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +// Ref: python livekit-agents/livekit/agents/utils/aio/itertools.py - 1-129 lines +// Based on https://github.com/maxfischer2781/asyncstdlib/blob/master/asyncstdlib/itertools.py + +interface ACloseable { + aclose(): Promise; +} + +function isACloseable(obj: unknown): obj is ACloseable { + return ( + typeof obj === 'object' && obj !== null && 'aclose' in obj && typeof obj.aclose === 'function' + ); +} + +async function closeIterator(iterator: AsyncIterator): Promise { + if (isACloseable(iterator)) { + await iterator.aclose(); + } else if ( + typeof iterator === 'object' && + iterator !== null && + 'return' in iterator && + typeof iterator.return === 'function' + ) { + // Note: for unstarted async generators, return() completes + // immediately without running finally blocks. This matches Python's + // behavior (aclose() on an unstarted generator is a no-op) and is + // safe because no resources were acquired inside the generator body. + await iterator.return(undefined); + } +} + +/** + * A simple async mutex lock. + * Uses a queue of resolve callbacks to ensure FIFO ordering. + */ +class Lock { + private _locked = false; + private _waiters: Array<() => void> = []; + + async acquire(): Promise<() => void> { + if (!this._locked) { + this._locked = true; + return this._release.bind(this); + } + + return new Promise<() => void>((resolve) => { + this._waiters.push(() => { + resolve(this._release.bind(this)); + }); + }); + } + + private _release(): void { + if (this._waiters.length > 0) { + const next = this._waiters.shift()!; + next(); + } else { + this._locked = false; + } + } +} + +/** + * A tee peer iterator implemented as a class rather than an async generator. + * This ensures proper cleanup even when the iterator is closed before being + * started (JS async generators skip try/finally if never advanced). + * + * Each peer maintains its own buffer and advances the shared upstream iterator + * under a lock when its buffer is empty. + * + * Error semantics: When the upstream iterator throws, the first peer to encounter + * the error stores it in the shared `exception` array. All other peers re-raise the + * same exception, ensuring every consumer sees the upstream failure. + */ +class TeePeerIterator implements AsyncIterableIterator { + private _buffer: T[]; + private _peers: T[][]; + private _iterator: AsyncIterator; + private _lock: Lock; + private _exception: [Error | null]; + private _done = false; + + constructor( + iterator: AsyncIterator, + buffer: T[], + peers: T[][], + lock: Lock, + exception: [Error | null], + ) { + this._iterator = iterator; + this._buffer = buffer; + this._peers = peers; + this._lock = lock; + this._exception = exception; + } + + async next(): Promise> { + if (this._done) { + return { value: undefined as unknown as T, done: true }; + } + + // If buffer has items, yield from buffer + if (this._buffer.length > 0) { + return { value: this._buffer.shift()!, done: false }; + } + + // Need to advance the upstream iterator under lock + const release = await this._lock.acquire(); + try { + // Re-check after acquiring lock — another peer may have filled our buffer + if (this._buffer.length > 0) { + return { value: this._buffer.shift()!, done: false }; + } + + // A peer already hit an upstream error — re-raise for this peer + if (this._exception[0] !== null) { + this._done = true; + this._removeSelf(); + throw this._exception[0]; + } + + let result: IteratorResult; + try { + result = await this._iterator.next(); + } catch (e) { + this._exception[0] = e instanceof Error ? e : new Error(String(e)); + this._done = true; + this._removeSelf(); + throw this._exception[0]; + } + + if (result.done) { + this._done = true; + this._removeSelf(); + return { value: undefined as unknown as T, done: true }; + } + + // Fan out to all peer buffers + for (const peerBuffer of this._peers) { + peerBuffer.push(result.value); + } + } finally { + release(); + } + + return { value: this._buffer.shift()!, done: false }; + } + + async return(): Promise> { + if (!this._done) { + this._done = true; + await this._removeSelfAsync(); + } + return { value: undefined as unknown as T, done: true }; + } + + async throw(e: unknown): Promise> { + this._done = true; + await this._removeSelfAsync(); + throw e; + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + } + + private _removeSelf(): void { + const idx = this._peers.indexOf(this._buffer); + if (idx !== -1) { + this._peers.splice(idx, 1); + } + } + + private async _removeSelfAsync(): Promise { + this._removeSelf(); + + // If we're the last peer, close the upstream iterator + if (this._peers.length === 0) { + await closeIterator(this._iterator); + } + } +} + +/** + * Split a single `AsyncIterable` into `n` independent async iterators. + * + * Each child iterator yields every item from the source. Items are buffered + * per-peer, and the source is advanced lazily (only when a peer's buffer is + * empty). When the last peer is closed or garbage-collected, the upstream + * iterator is closed automatically. + * + * This is the JS equivalent of Python's `aio.itertools.tee(iterable, n)`. + * + * @example + * ```ts + * const source = someAsyncIterable(); + * const [a, b] = tee(source, 2); + * + * // Both a and b yield every item from source + * for await (const item of a) { ... } + * for await (const item of b) { ... } + * ``` + */ +export class Tee { + private _iterator: AsyncIterator; + private _buffers: T[][]; + private _children: TeePeerIterator[]; + + constructor(iterable: AsyncIterable, n: number = 2) { + this._iterator = iterable[Symbol.asyncIterator](); + this._buffers = Array.from({ length: n }, () => []); + + const lock = new Lock(); + const exception: [Error | null] = [null]; + + this._children = this._buffers.map( + (buffer) => new TeePeerIterator(this._iterator, buffer, this._buffers, lock, exception), + ); + } + + /** The number of peer iterators. */ + get length(): number { + return this._children.length; + } + + /** Access a specific peer by index. */ + get(index: number): TeePeerIterator { + const child = this._children[index]; + if (!child) { + throw new RangeError(`tee index ${index} out of range [0, ${this._children.length})`); + } + return child; + } + + /** Destructure into an array of async iterators. */ + toArray(): TeePeerIterator[] { + return [...this._children]; + } + + /** Iterate over the peer iterators. */ + [Symbol.iterator](): Iterator> { + return this._children[Symbol.iterator](); + } + + /** Close all peer iterators and the upstream iterator. */ + async aclose(): Promise { + for (const child of this._children) { + try { + await child.return(); + } catch { + // Ignore errors during cleanup + } + } + + // Ensure upstream is closed even if peer cleanup didn't trigger it + try { + await closeIterator(this._iterator); + } catch { + // Ignore errors during cleanup + } + } +} + +/** + * Convenience function to tee an async iterable into `n` independent iterators. + * + * @returns A {@link Tee} instance that can be destructured or indexed. + */ +export function tee(iterable: AsyncIterable, n: number = 2): Tee { + return new Tee(iterable, n); +}