diff --git a/.changeset/long-bags-argue.md b/.changeset/long-bags-argue.md new file mode 100644 index 000000000..dba4e1891 --- /dev/null +++ b/.changeset/long-bags-argue.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Report actual HTTP status codes in connectWs instead of generic connection error diff --git a/agents/src/inference/utils.test.ts b/agents/src/inference/utils.test.ts new file mode 100644 index 000000000..e5cdaae15 --- /dev/null +++ b/agents/src/inference/utils.test.ts @@ -0,0 +1,101 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import http from 'node:http'; +import { type AddressInfo } from 'node:net'; +import { afterAll, beforeAll, describe, expect, it } from 'vitest'; +import { APIConnectionError, APIStatusError } from '../_exceptions.js'; +import { connectWs } from './utils.js'; + +/** + * Spins up a throwaway HTTP server that responds to WebSocket upgrade requests + * with a configurable status code instead of completing the handshake. + */ +function createRejectServer(statusCode: number): Promise { + return new Promise((resolve) => { + const server = http.createServer((req, res) => { + res.writeHead(statusCode); + res.end(); + }); + // Also handle upgrade requests to ensure ws gets the rejection + server.on('upgrade', (req, socket) => { + socket.write( + `HTTP/1.1 ${statusCode} ${http.STATUS_CODES[statusCode] || 'Unknown'}\r\n\r\n`, + ); + socket.destroy(); + }); + server.listen(0, '127.0.0.1', () => resolve(server)); + }); +} + +function serverUrl(server: http.Server): string { + const addr = server.address() as AddressInfo; + return `ws://127.0.0.1:${addr.port}`; +} + +describe('connectWs', () => { + const servers: http.Server[] = []; + + afterAll(() => { + for (const s of servers) { + s.close(); + } + }); + + it('rejects with APIStatusError(429) for rate-limited responses', async () => { + const server = await createRejectServer(429); + servers.push(server); + + const err = await connectWs(serverUrl(server), {}, 5000).catch((e) => e); + + expect(err).toBeInstanceOf(APIStatusError); + expect(err.statusCode).toBe(429); + expect(err.message).toBe('LiveKit gateway quota exceeded'); + expect(err.retryable).toBe(true); + }); + + it('rejects with APIStatusError for 401 Unauthorized', async () => { + const server = await createRejectServer(401); + servers.push(server); + + const err = await connectWs(serverUrl(server), {}, 5000).catch((e) => e); + + expect(err).toBeInstanceOf(APIStatusError); + expect(err.statusCode).toBe(401); + expect(err.message).toMatch(/Unexpected server response: 401/); + expect(err.retryable).toBe(false); + }); + + it('rejects with APIStatusError for 500 Internal Server Error', async () => { + const server = await createRejectServer(500); + servers.push(server); + + const err = await connectWs(serverUrl(server), {}, 5000).catch((e) => e); + + expect(err).toBeInstanceOf(APIStatusError); + expect(err.statusCode).toBe(500); + expect(err.message).toMatch(/Unexpected server response: 500/); + expect(err.retryable).toBe(true); + }); + + it('rejects with APIConnectionError preserving original message for network errors', async () => { + // Connect to a port where nothing is listening + const err = await connectWs('ws://127.0.0.1:1', {}, 5000).catch((e) => e); + + expect(err).toBeInstanceOf(APIConnectionError); + expect(err.message).toMatch(/ECONNREFUSED/); + expect(err.retryable).toBe(true); + }); + + it('rejects with APIConnectionError on timeout', async () => { + // Create a server that never responds to the upgrade + const server = http.createServer(); + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)); + servers.push(server); + + const err = await connectWs(serverUrl(server), {}, 100).catch((e) => e); + + expect(err).toBeInstanceOf(APIConnectionError); + expect(err.message).toMatch(/Timeout/); + }); +}); diff --git a/agents/src/inference/utils.ts b/agents/src/inference/utils.ts index b3b772ef6..3f8cebdbf 100644 --- a/agents/src/inference/utils.ts +++ b/agents/src/inference/utils.ts @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 import { AccessToken } from 'livekit-server-sdk'; import { WebSocket } from 'ws'; -import { APIConnectionError, APIStatusError } from '../index.js'; +import { APIConnectionError, APIStatusError } from '../_exceptions.js'; export type AnyString = string & NonNullable; @@ -35,20 +35,32 @@ export async function connectWs( resolve(socket); }; - const onError = (err: unknown) => { + const onUnexpectedResponse = (_req: unknown, res: { statusCode: number }) => { clearTimeout(timeout); - if (err && typeof err === 'object' && 'code' in err && (err as any).code === 429) { + socket.close(); + if (res.statusCode === 429) { reject( new APIStatusError({ message: 'LiveKit gateway quota exceeded', - options: { statusCode: 429 }, + options: { statusCode: 429, retryable: true }, }), ); } else { - reject(new APIConnectionError({ message: 'Error connecting to LiveKit WebSocket' })); + reject( + new APIStatusError({ + message: `Unexpected server response: ${res.statusCode}`, + options: { statusCode: res.statusCode }, + }), + ); } }; + const onError = (err: unknown) => { + clearTimeout(timeout); + const message = err instanceof Error ? err.message : 'Error connecting to LiveKit WebSocket'; + reject(new APIConnectionError({ message })); + }; + const onClose = (code: number) => { clearTimeout(timeout); if (code !== 1000) { @@ -59,7 +71,9 @@ export async function connectWs( ); } }; + socket.once('open', onOpen); + socket.once('unexpected-response', onUnexpectedResponse); socket.once('error', onError); socket.once('close', onClose); });