From 1d8439b3c2856e7e661f4427031392110c5b26b2 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 11:46:35 -0400 Subject: [PATCH 1/8] Add SSE transport (upstreamed from gator-liquidators) and Built with eth.zig section --- README.md | 19 ++- src/root.zig | 2 + src/sse_transport.zig | 289 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 src/sse_transport.zig diff --git a/README.md b/README.md index 51ed6bf..ee1fb2f 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,23 @@ const key = try eth.hd_wallet.deriveEthAccount(seed, 0); const addr = key.toAddress(); ``` +## Built with eth.zig + +Real production bots and SDKs built on eth.zig: + +| Project | Description | Language | +|---------|-------------|----------| +| [perpcity-zig-sdk](https://github.com/StrobeLabs/perpcity-zig-sdk) | High-performance SDK for the PerpCity perpetual futures protocol on Base. Comptime ABI encoding, lock-free nonce management, 2-tier price cache. | Zig | +| [gator-liquidators](https://github.com/StrobeLabs/gator-liquidators) | Production liquidation keeper for PerpCity. Batch-checks positions via Multicall3, pre-signs liquidation txs, submits to Base sequencer. | Zig | + +``` +eth.zig + └── perpcity-zig-sdk (protocol SDK with comptime contract ABIs) + └── gator-liquidators (production liquidation bot on Base) +``` + +Built something with eth.zig? Open a PR to add it here. + ## Installation **One-liner:** @@ -176,7 +193,7 @@ cd examples && zig build && ./zig-out/bin/01_derive_address | **Crypto** | `secp256k1`, `signer`, `signature`, `keccak`, `eip155` | ECDSA signing (RFC 6979), Keccak-256, EIP-155 | | **Types** | `transaction`, `receipt`, `block`, `blob`, `access_list` | Legacy, EIP-2930, EIP-1559, EIP-4844 transactions | | **Accounts** | `mnemonic`, `hd_wallet` | BIP-32/39/44 HD wallets and mnemonic generation | -| **Transport** | `http_transport`, `ws_transport`, `json_rpc`, `provider`, `subscription` | HTTP and WebSocket JSON-RPC transports | +| **Transport** | `http_transport`, `ws_transport`, `sse_transport`, `json_rpc`, `provider`, `subscription` | HTTP, WebSocket, and SSE transports | | **ENS** | `ens_namehash`, `ens_resolver`, `ens_reverse` | ENS name resolution and reverse lookup | | **Client** | `wallet`, `contract`, `multicall`, `event`, `erc20`, `erc721` | Signing wallet, contract interaction, Multicall3, token wrappers | | **Standards** | `eip712`, `abi_json` | EIP-712 typed data signing, Solidity JSON ABI parsing | diff --git a/src/root.zig b/src/root.zig index 7c33bea..41f7f49 100644 --- a/src/root.zig +++ b/src/root.zig @@ -34,6 +34,7 @@ pub const hd_wallet = @import("hd_wallet.zig"); pub const json_rpc = @import("json_rpc.zig"); pub const http_transport = @import("http_transport.zig"); pub const ws_transport = @import("ws_transport.zig"); +pub const sse_transport = @import("sse_transport.zig"); pub const subscription = @import("subscription.zig"); pub const provider = @import("provider.zig"); pub const retry_provider = @import("retry_provider.zig"); @@ -106,6 +107,7 @@ test { _ = @import("json_rpc.zig"); _ = @import("http_transport.zig"); _ = @import("ws_transport.zig"); + _ = @import("sse_transport.zig"); _ = @import("subscription.zig"); _ = @import("provider.zig"); _ = @import("retry_provider.zig"); diff --git a/src/sse_transport.zig b/src/sse_transport.zig new file mode 100644 index 0000000..d858c67 --- /dev/null +++ b/src/sse_transport.zig @@ -0,0 +1,289 @@ +const std = @import("std"); + +// ============================================================================ +// Types +// ============================================================================ + +/// A parsed SSE event as defined by the W3C Server-Sent Events specification. +/// All fields borrow from the underlying line buffer -- they are only valid +/// until the next call to `SseParser.feedLine` or `SseParser.reset`. +pub const SseEvent = struct { + /// The `event:` field value, or null if omitted (defaults to "message"). + event_type: ?[]const u8 = null, + /// The `data:` field value, or null if no data line was present. + data: ?[]const u8 = null, +}; + +pub const SseError = error{ + ConnectionFailed, + BadStatus, +}; + +// ============================================================================ +// Parser +// ============================================================================ + +/// Line-oriented SSE parser. +/// +/// Feed lines one at a time via `feedLine`. The parser accumulates `event:` +/// and `data:` fields and emits an `SseEvent` when it sees a blank line +/// (the event boundary per the SSE spec). +/// +/// Designed to be testable without any network I/O. +pub const SseParser = struct { + current_event_type: ?[]const u8 = null, + current_data: ?[]const u8 = null, + + /// Feed a single line (without trailing `\n` or `\r\n`) to the parser. + /// Returns an `SseEvent` if a blank line was encountered (event boundary), + /// or null otherwise. + pub fn feedLine(self: *SseParser, line: []const u8) ?SseEvent { + const trimmed = std.mem.trimRight(u8, line, "\r"); + + // Blank line = event boundary. + if (trimmed.len == 0) { + if (self.current_event_type != null or self.current_data != null) { + const evt = SseEvent{ + .event_type = self.current_event_type, + .data = self.current_data, + }; + self.current_event_type = null; + self.current_data = null; + return evt; + } + return null; + } + + // Lines starting with ':' are comments -- ignore per spec. + if (trimmed[0] == ':') return null; + + // Parse "field: value" or "field:value". + if (std.mem.indexOf(u8, trimmed, ":")) |colon_idx| { + const field = trimmed[0..colon_idx]; + var value = trimmed[colon_idx + 1 ..]; + // Strip optional single leading space after colon (SSE spec §9.2.6). + if (value.len > 0 and value[0] == ' ') value = value[1..]; + + if (std.mem.eql(u8, field, "event")) { + self.current_event_type = value; + } else if (std.mem.eql(u8, field, "data")) { + self.current_data = value; + } + // Other fields (id, retry) are intentionally ignored. + } + + return null; + } + + /// Reset accumulated state, e.g. on reconnect. + pub fn reset(self: *SseParser) void { + self.current_event_type = null; + self.current_data = null; + } +}; + +// ============================================================================ +// Transport +// ============================================================================ + +/// Connect to an SSE endpoint and call `callback` for each event until the +/// connection closes or an error occurs. +/// +/// This function makes a single HTTP request and streams events until EOF. +/// It does NOT reconnect -- wrap this in a loop with exponential backoff for +/// production use (see `SseTransport.subscribeWithReconnect`). +/// +/// `extra_headers` are appended after the required `Accept` and `Cache-Control` +/// headers. The caller is responsible for any authentication headers. +pub fn subscribe( + allocator: std.mem.Allocator, + url: []const u8, + extra_headers: []const std.http.Header, + callback: *const fn (event: SseEvent) void, +) !void { + var client = std.http.Client{ .allocator = allocator }; + defer client.deinit(); + + const uri = try std.Uri.parse(url); + + // Build header list: required SSE headers + caller extras. + const base_headers: []const std.http.Header = &.{ + .{ .name = "Accept", .value = "text/event-stream" }, + .{ .name = "Cache-Control", .value = "no-cache" }, + }; + const all_headers = try std.mem.concat( + allocator, + std.http.Header, + &.{ base_headers, extra_headers }, + ); + defer allocator.free(all_headers); + + var req = try client.request(.GET, uri, .{ .extra_headers = all_headers }); + defer req.deinit(); + + try req.sendBodiless(); + + var redirect_buf: [4096]u8 = undefined; + var response = try req.receiveHead(&redirect_buf); + + if (response.head.status != .ok) { + return error.BadStatus; + } + + var parser = SseParser{}; + var transfer_buf: [8192]u8 = undefined; + const reader = response.reader(&transfer_buf); + + while (true) { + const line_with_nl = reader.takeDelimiterInclusive('\n') catch |err| switch (err) { + error.EndOfStream => return, // normal close + else => return err, + }; + const line = line_with_nl[0 .. line_with_nl.len - 1]; + + if (parser.feedLine(line)) |evt| { + callback(evt); + } + } +} + +/// Options for `subscribeWithReconnect`. +pub const ReconnectOpts = struct { + /// Initial backoff in milliseconds. Default: 1_000. + initial_backoff_ms: u64 = 1_000, + /// Maximum backoff in milliseconds. Default: 30_000. + max_backoff_ms: u64 = 30_000, + /// Optional callback invoked before each reconnect attempt. + /// Receives the backoff delay that will be applied. + on_reconnect: ?*const fn (backoff_ms: u64) void = null, +}; + +/// Connect to an SSE endpoint and stream events forever, reconnecting with +/// exponential backoff on disconnection or error. +/// +/// This function never returns under normal operation. The caller's thread +/// will be blocked here. +pub fn subscribeWithReconnect( + allocator: std.mem.Allocator, + url: []const u8, + extra_headers: []const std.http.Header, + opts: ReconnectOpts, + callback: *const fn (event: SseEvent) void, +) void { + var backoff_ms = opts.initial_backoff_ms; + + while (true) { + if (subscribe(allocator, url, extra_headers, callback)) |_| { + // Clean close -- reset backoff. + backoff_ms = opts.initial_backoff_ms; + } else |_| {} + + if (opts.on_reconnect) |cb| cb(backoff_ms); + std.Thread.sleep(backoff_ms * std.time.ns_per_ms); + backoff_ms = @min(backoff_ms * 2, opts.max_backoff_ms); + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +test "SseParser basic event" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: perp_price") == null); + try std.testing.expect(parser.feedLine("data: {\"price\": 100}") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("perp_price", evt.event_type.?); + try std.testing.expectEqualStrings("{\"price\": 100}", evt.data.?); +} + +test "SseParser ignores comments" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine(": this is a comment") == null); + try std.testing.expect(parser.feedLine("event: test") == null); + try std.testing.expect(parser.feedLine(": another comment") == null); + try std.testing.expect(parser.feedLine("data: hello") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("test", evt.event_type.?); + try std.testing.expectEqualStrings("hello", evt.data.?); +} + +test "SseParser blank line without prior data emits nothing" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("") == null); + try std.testing.expect(parser.feedLine("") == null); +} + +test "SseParser handles event with no data" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: heartbeat") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("heartbeat", evt.event_type.?); + try std.testing.expect(evt.data == null); +} + +test "SseParser handles data with no event type" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("data: orphan") == null); + const evt = parser.feedLine("").?; + try std.testing.expect(evt.event_type == null); + try std.testing.expectEqualStrings("orphan", evt.data.?); +} + +test "SseParser resets state" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: test") == null); + parser.reset(); + try std.testing.expect(parser.feedLine("") == null); +} + +test "SseParser handles carriage return in line" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: test\r") == null); + try std.testing.expect(parser.feedLine("data: value\r") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("test", evt.event_type.?); + try std.testing.expectEqualStrings("value", evt.data.?); +} + +test "SseParser colon without space" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event:no_space") == null); + try std.testing.expect(parser.feedLine("data:also_no_space") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("no_space", evt.event_type.?); + try std.testing.expectEqualStrings("also_no_space", evt.data.?); +} + +test "SseParser multiple events in sequence" { + var parser = SseParser{}; + _ = parser.feedLine("event: first"); + _ = parser.feedLine("data: 1"); + const ev1 = parser.feedLine("").?; + try std.testing.expectEqualStrings("first", ev1.event_type.?); + _ = parser.feedLine("event: second"); + _ = parser.feedLine("data: 2"); + const ev2 = parser.feedLine("").?; + try std.testing.expectEqualStrings("second", ev2.event_type.?); + try std.testing.expectEqualStrings("2", ev2.data.?); +} + +test "SseParser data overwrites previous data within same event" { + var parser = SseParser{}; + _ = parser.feedLine("event: test"); + _ = parser.feedLine("data: first"); + _ = parser.feedLine("data: second"); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("second", evt.data.?); +} + +test "SseParser ignores unknown fields (id, retry)" { + var parser = SseParser{}; + _ = parser.feedLine("id: 12345"); + _ = parser.feedLine("retry: 5000"); + _ = parser.feedLine("event: perp_price"); + _ = parser.feedLine("data: test_data"); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("perp_price", evt.event_type.?); + try std.testing.expectEqualStrings("test_data", evt.data.?); +} From 5a5165827b022b231b63b9f86fdcfcc1c74bdb99 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 11:56:39 -0400 Subject: [PATCH 2/8] Fix SSE transport: spec-compliant data append, id/retry fields, Last-Event-ID on reconnect --- README.md | 2 +- src/sse_transport.zig | 246 ++++++++++++++++++++++++++++++++---------- 2 files changed, 192 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index ee1fb2f..fbd9047 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ Real production bots and SDKs built on eth.zig: | [perpcity-zig-sdk](https://github.com/StrobeLabs/perpcity-zig-sdk) | High-performance SDK for the PerpCity perpetual futures protocol on Base. Comptime ABI encoding, lock-free nonce management, 2-tier price cache. | Zig | | [gator-liquidators](https://github.com/StrobeLabs/gator-liquidators) | Production liquidation keeper for PerpCity. Batch-checks positions via Multicall3, pre-signs liquidation txs, submits to Base sequencer. | Zig | -``` +```text eth.zig └── perpcity-zig-sdk (protocol SDK with comptime contract ABIs) └── gator-liquidators (production liquidation bot on Base) diff --git a/src/sse_transport.zig b/src/sse_transport.zig index d858c67..e26d629 100644 --- a/src/sse_transport.zig +++ b/src/sse_transport.zig @@ -5,12 +5,17 @@ const std = @import("std"); // ============================================================================ /// A parsed SSE event as defined by the W3C Server-Sent Events specification. -/// All fields borrow from the underlying line buffer -- they are only valid -/// until the next call to `SseParser.feedLine` or `SseParser.reset`. +/// +/// All slice fields point into buffers owned by the `SseParser` that produced +/// this event. They are valid until the next call to `SseParser.feedLine` or +/// `SseParser.reset`. pub const SseEvent = struct { - /// The `event:` field value, or null if omitted (defaults to "message"). - event_type: ?[]const u8 = null, - /// The `data:` field value, or null if no data line was present. + /// The `id:` field value, or null if omitted. + id: ?[]const u8 = null, + /// The `event:` field value, or null if omitted (default event type is "message"). + event: ?[]const u8 = null, + /// The accumulated `data:` field value. Multiple `data:` lines within one + /// event are joined with U+000A as required by the spec. data: ?[]const u8 = null, }; @@ -25,14 +30,41 @@ pub const SseError = error{ /// Line-oriented SSE parser. /// -/// Feed lines one at a time via `feedLine`. The parser accumulates `event:` -/// and `data:` fields and emits an `SseEvent` when it sees a blank line -/// (the event boundary per the SSE spec). +/// Feed lines one at a time via `feedLine`. The parser accumulates `event:`, +/// `id:`, and `data:` fields and emits an `SseEvent` when it encounters a +/// blank line (the event boundary per the SSE spec). +/// +/// All field values are copied into fixed internal buffers, so emitted +/// `SseEvent` slices remain valid until the next call to `feedLine` or +/// `reset` -- regardless of whether the caller's line buffer has been reused. +/// +/// `last_event_id` and `retry_ms` persist across events and are intended for +/// use by reconnecting transports. /// /// Designed to be testable without any network I/O. pub const SseParser = struct { - current_event_type: ?[]const u8 = null, - current_data: ?[]const u8 = null, + // Per-event buffers (cleared on event dispatch). + event_buf: [256]u8 = undefined, + event_len: usize = 0, + id_buf: [512]u8 = undefined, + id_len: usize = 0, + data_buf: [65536]u8 = undefined, + data_len: usize = 0, + + // Persistent state (survives event boundaries and reconnects). + /// The last `id:` value seen across all events. Sent as `Last-Event-ID` + /// on reconnect. Empty slice means no id has been received yet. + last_event_id_buf: [512]u8 = undefined, + last_event_id_len: usize = 0, + /// Server-specified reconnect delay in milliseconds (`retry:` field). + /// Null means the server has not specified a value. + retry_ms: ?u64 = null, + + /// Return the last received event id, or null if none has been seen. + pub fn lastEventId(self: *const SseParser) ?[]const u8 { + if (self.last_event_id_len == 0) return null; + return self.last_event_id_buf[0..self.last_event_id_len]; + } /// Feed a single line (without trailing `\n` or `\r\n`) to the parser. /// Returns an `SseEvent` if a blank line was encountered (event boundary), @@ -40,15 +72,26 @@ pub const SseParser = struct { pub fn feedLine(self: *SseParser, line: []const u8) ?SseEvent { const trimmed = std.mem.trimRight(u8, line, "\r"); - // Blank line = event boundary. + // Blank line = event boundary: dispatch any accumulated fields. if (trimmed.len == 0) { - if (self.current_event_type != null or self.current_data != null) { + if (self.event_len > 0 or self.data_len > 0 or self.id_len > 0) { + // Update last-event-id if the event included one. + if (self.id_len > 0) { + const copy_len = @min(self.id_len, self.last_event_id_buf.len); + @memcpy(self.last_event_id_buf[0..copy_len], self.id_buf[0..copy_len]); + self.last_event_id_len = copy_len; + } + const evt = SseEvent{ - .event_type = self.current_event_type, - .data = self.current_data, + .id = if (self.id_len > 0) self.id_buf[0..self.id_len] else null, + .event = if (self.event_len > 0) self.event_buf[0..self.event_len] else null, + .data = if (self.data_len > 0) self.data_buf[0..self.data_len] else null, }; - self.current_event_type = null; - self.current_data = null; + + // Clear per-event state; last_event_id and retry_ms persist. + self.event_len = 0; + self.id_len = 0; + self.data_len = 0; return evt; } return null; @@ -65,20 +108,40 @@ pub const SseParser = struct { if (value.len > 0 and value[0] == ' ') value = value[1..]; if (std.mem.eql(u8, field, "event")) { - self.current_event_type = value; + const copy_len = @min(value.len, self.event_buf.len); + @memcpy(self.event_buf[0..copy_len], value[0..copy_len]); + self.event_len = copy_len; } else if (std.mem.eql(u8, field, "data")) { - self.current_data = value; + // Append to data buffer, joining multiple data: lines with '\n'. + if (self.data_len > 0 and self.data_len < self.data_buf.len) { + self.data_buf[self.data_len] = '\n'; + self.data_len += 1; + } + const remaining = self.data_buf.len - self.data_len; + const copy_len = @min(value.len, remaining); + @memcpy(self.data_buf[self.data_len .. self.data_len + copy_len], value[0..copy_len]); + self.data_len += copy_len; + } else if (std.mem.eql(u8, field, "id")) { + const copy_len = @min(value.len, self.id_buf.len); + @memcpy(self.id_buf[0..copy_len], value[0..copy_len]); + self.id_len = copy_len; + } else if (std.mem.eql(u8, field, "retry")) { + // Parse the retry value as a decimal integer of milliseconds. + if (std.fmt.parseInt(u64, value, 10)) |ms| { + self.retry_ms = ms; + } else |_| {} // ignore malformed retry values per spec } - // Other fields (id, retry) are intentionally ignored. } return null; } - /// Reset accumulated state, e.g. on reconnect. + /// Reset per-event accumulated state. Does NOT clear `last_event_id` or + /// `retry_ms` -- those are persistent and survive reconnects. pub fn reset(self: *SseParser) void { - self.current_event_type = null; - self.current_data = null; + self.event_len = 0; + self.id_len = 0; + self.data_len = 0; } }; @@ -91,14 +154,18 @@ pub const SseParser = struct { /// /// This function makes a single HTTP request and streams events until EOF. /// It does NOT reconnect -- wrap this in a loop with exponential backoff for -/// production use (see `SseTransport.subscribeWithReconnect`). +/// production use (see `subscribeWithReconnect`). /// /// `extra_headers` are appended after the required `Accept` and `Cache-Control` /// headers. The caller is responsible for any authentication headers. +/// +/// `parser` is caller-supplied so that `last_event_id` and `retry_ms` persist +/// across reconnects when used with `subscribeWithReconnect`. pub fn subscribe( allocator: std.mem.Allocator, url: []const u8, extra_headers: []const std.http.Header, + parser: *SseParser, callback: *const fn (event: SseEvent) void, ) !void { var client = std.http.Client{ .allocator = allocator }; @@ -106,15 +173,28 @@ pub fn subscribe( const uri = try std.Uri.parse(url); - // Build header list: required SSE headers + caller extras. + // Build header list: base SSE headers + Last-Event-ID (if any) + caller extras. const base_headers: []const std.http.Header = &.{ .{ .name = "Accept", .value = "text/event-stream" }, .{ .name = "Cache-Control", .value = "no-cache" }, }; + + var last_id_header_buf: [512 + 20]u8 = undefined; // "Last-Event-ID: " + id + var id_headers: []const std.http.Header = &.{}; + if (parser.lastEventId()) |last_id| { + const prefix = ""; // value is the id itself; name is the header name + _ = prefix; + @memcpy(last_id_header_buf[0..last_id.len], last_id); + id_headers = &.{.{ + .name = "Last-Event-ID", + .value = last_id_header_buf[0..last_id.len], + }}; + } + const all_headers = try std.mem.concat( allocator, std.http.Header, - &.{ base_headers, extra_headers }, + &.{ base_headers, id_headers, extra_headers }, ); defer allocator.free(all_headers); @@ -130,7 +210,9 @@ pub fn subscribe( return error.BadStatus; } - var parser = SseParser{}; + // Reset per-event state but preserve last_event_id and retry_ms. + parser.reset(); + var transfer_buf: [8192]u8 = undefined; const reader = response.reader(&transfer_buf); @@ -149,9 +231,10 @@ pub fn subscribe( /// Options for `subscribeWithReconnect`. pub const ReconnectOpts = struct { - /// Initial backoff in milliseconds. Default: 1_000. + /// Initial backoff in milliseconds. Overridden by the server's `retry:` value + /// if one has been received. Default: 1_000. initial_backoff_ms: u64 = 1_000, - /// Maximum backoff in milliseconds. Default: 30_000. + /// Maximum backoff cap in milliseconds. Default: 30_000. max_backoff_ms: u64 = 30_000, /// Optional callback invoked before each reconnect attempt. /// Receives the backoff delay that will be applied. @@ -161,6 +244,10 @@ pub const ReconnectOpts = struct { /// Connect to an SSE endpoint and stream events forever, reconnecting with /// exponential backoff on disconnection or error. /// +/// `Last-Event-ID` is automatically sent on reconnect if the server has +/// previously sent an `id:` field. The reconnect delay respects the server's +/// `retry:` value when present. +/// /// This function never returns under normal operation. The caller's thread /// will be blocked here. pub fn subscribeWithReconnect( @@ -170,17 +257,26 @@ pub fn subscribeWithReconnect( opts: ReconnectOpts, callback: *const fn (event: SseEvent) void, ) void { + // One parser instance shared across reconnects so last_event_id and + // retry_ms survive disconnections. + var parser = SseParser{}; var backoff_ms = opts.initial_backoff_ms; while (true) { - if (subscribe(allocator, url, extra_headers, callback)) |_| { + if (subscribe(allocator, url, extra_headers, &parser, callback)) |_| { // Clean close -- reset backoff. backoff_ms = opts.initial_backoff_ms; } else |_| {} - if (opts.on_reconnect) |cb| cb(backoff_ms); - std.Thread.sleep(backoff_ms * std.time.ns_per_ms); - backoff_ms = @min(backoff_ms * 2, opts.max_backoff_ms); + // Use server-specified retry delay if available, otherwise exponential backoff. + const delay = parser.retry_ms orelse backoff_ms; + if (opts.on_reconnect) |cb| cb(delay); + std.Thread.sleep(delay * std.time.ns_per_ms); + + // Only advance exponential backoff when server hasn't specified retry. + if (parser.retry_ms == null) { + backoff_ms = @min(backoff_ms * 2, opts.max_backoff_ms); + } } } @@ -193,8 +289,54 @@ test "SseParser basic event" { try std.testing.expect(parser.feedLine("event: perp_price") == null); try std.testing.expect(parser.feedLine("data: {\"price\": 100}") == null); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("perp_price", evt.event_type.?); + try std.testing.expectEqualStrings("perp_price", evt.event.?); try std.testing.expectEqualStrings("{\"price\": 100}", evt.data.?); + try std.testing.expect(evt.id == null); +} + +test "SseParser captures id field" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("id: 42") == null); + try std.testing.expect(parser.feedLine("event: update") == null); + try std.testing.expect(parser.feedLine("data: hello") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("42", evt.id.?); + try std.testing.expectEqualStrings("update", evt.event.?); + try std.testing.expectEqualStrings("hello", evt.data.?); +} + +test "SseParser persists last_event_id across events" { + var parser = SseParser{}; + _ = parser.feedLine("id: 7"); + _ = parser.feedLine("data: first"); + _ = parser.feedLine(""); + try std.testing.expectEqualStrings("7", parser.lastEventId().?); + + // Next event without id -- last_event_id should remain "7". + _ = parser.feedLine("data: second"); + _ = parser.feedLine(""); + try std.testing.expectEqualStrings("7", parser.lastEventId().?); +} + +test "SseParser parses retry field" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("retry: 3000") == null); + try std.testing.expect(parser.retry_ms.? == 3000); +} + +test "SseParser ignores malformed retry value" { + var parser = SseParser{}; + _ = parser.feedLine("retry: not_a_number"); + try std.testing.expect(parser.retry_ms == null); +} + +test "SseParser appends multiple data lines with newline" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: test") == null); + try std.testing.expect(parser.feedLine("data: first") == null); + try std.testing.expect(parser.feedLine("data: second") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("first\nsecond", evt.data.?); } test "SseParser ignores comments" { @@ -204,7 +346,7 @@ test "SseParser ignores comments" { try std.testing.expect(parser.feedLine(": another comment") == null); try std.testing.expect(parser.feedLine("data: hello") == null); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("test", evt.event_type.?); + try std.testing.expectEqualStrings("test", evt.event.?); try std.testing.expectEqualStrings("hello", evt.data.?); } @@ -218,7 +360,7 @@ test "SseParser handles event with no data" { var parser = SseParser{}; try std.testing.expect(parser.feedLine("event: heartbeat") == null); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("heartbeat", evt.event_type.?); + try std.testing.expectEqualStrings("heartbeat", evt.event.?); try std.testing.expect(evt.data == null); } @@ -226,15 +368,20 @@ test "SseParser handles data with no event type" { var parser = SseParser{}; try std.testing.expect(parser.feedLine("data: orphan") == null); const evt = parser.feedLine("").?; - try std.testing.expect(evt.event_type == null); + try std.testing.expect(evt.event == null); try std.testing.expectEqualStrings("orphan", evt.data.?); } -test "SseParser resets state" { +test "SseParser resets per-event state but not last_event_id" { var parser = SseParser{}; - try std.testing.expect(parser.feedLine("event: test") == null); + _ = parser.feedLine("id: 99"); + _ = parser.feedLine("event: test"); parser.reset(); + // Per-event fields cleared. try std.testing.expect(parser.feedLine("") == null); + // last_event_id is NOT cleared by reset. + // (It's updated on dispatch, not on reset, so still 0 here.) + try std.testing.expect(parser.lastEventId() == null); } test "SseParser handles carriage return in line" { @@ -242,7 +389,7 @@ test "SseParser handles carriage return in line" { try std.testing.expect(parser.feedLine("event: test\r") == null); try std.testing.expect(parser.feedLine("data: value\r") == null); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("test", evt.event_type.?); + try std.testing.expectEqualStrings("test", evt.event.?); try std.testing.expectEqualStrings("value", evt.data.?); } @@ -251,7 +398,7 @@ test "SseParser colon without space" { try std.testing.expect(parser.feedLine("event:no_space") == null); try std.testing.expect(parser.feedLine("data:also_no_space") == null); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("no_space", evt.event_type.?); + try std.testing.expectEqualStrings("no_space", evt.event.?); try std.testing.expectEqualStrings("also_no_space", evt.data.?); } @@ -260,30 +407,19 @@ test "SseParser multiple events in sequence" { _ = parser.feedLine("event: first"); _ = parser.feedLine("data: 1"); const ev1 = parser.feedLine("").?; - try std.testing.expectEqualStrings("first", ev1.event_type.?); + try std.testing.expectEqualStrings("first", ev1.event.?); _ = parser.feedLine("event: second"); _ = parser.feedLine("data: 2"); const ev2 = parser.feedLine("").?; - try std.testing.expectEqualStrings("second", ev2.event_type.?); + try std.testing.expectEqualStrings("second", ev2.event.?); try std.testing.expectEqualStrings("2", ev2.data.?); } -test "SseParser data overwrites previous data within same event" { - var parser = SseParser{}; - _ = parser.feedLine("event: test"); - _ = parser.feedLine("data: first"); - _ = parser.feedLine("data: second"); - const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("second", evt.data.?); -} - -test "SseParser ignores unknown fields (id, retry)" { +test "SseParser ignores unknown fields" { var parser = SseParser{}; - _ = parser.feedLine("id: 12345"); - _ = parser.feedLine("retry: 5000"); _ = parser.feedLine("event: perp_price"); _ = parser.feedLine("data: test_data"); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("perp_price", evt.event_type.?); + try std.testing.expectEqualStrings("perp_price", evt.event.?); try std.testing.expectEqualStrings("test_data", evt.data.?); } From 7cfc007de3e90c64ef2da2794ebb732a311bda05 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 12:06:21 -0400 Subject: [PATCH 3/8] Fix SSE parser: spec-correct dispatch, has_id for empty id clearing, no-colon field support --- src/sse_transport.zig | 113 +++++++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/src/sse_transport.zig b/src/sse_transport.zig index e26d629..9b904d1 100644 --- a/src/sse_transport.zig +++ b/src/sse_transport.zig @@ -48,6 +48,10 @@ pub const SseParser = struct { event_len: usize = 0, id_buf: [512]u8 = undefined, id_len: usize = 0, + /// True when the current event block contained at least one `id:` line, + /// even if its value was empty. Distinguishes "id seen with empty value" + /// (which clears last_event_id) from "id not present" (no change). + has_id: bool = false, data_buf: [65536]u8 = undefined, data_len: usize = 0, @@ -67,70 +71,74 @@ pub const SseParser = struct { } /// Feed a single line (without trailing `\n` or `\r\n`) to the parser. - /// Returns an `SseEvent` if a blank line was encountered (event boundary), - /// or null otherwise. + /// Returns an `SseEvent` if a blank line was encountered (event boundary) + /// AND the event contains data, or null otherwise. pub fn feedLine(self: *SseParser, line: []const u8) ?SseEvent { const trimmed = std.mem.trimRight(u8, line, "\r"); - // Blank line = event boundary: dispatch any accumulated fields. + // Blank line = event boundary. if (trimmed.len == 0) { - if (self.event_len > 0 or self.data_len > 0 or self.id_len > 0) { - // Update last-event-id if the event included one. - if (self.id_len > 0) { - const copy_len = @min(self.id_len, self.last_event_id_buf.len); - @memcpy(self.last_event_id_buf[0..copy_len], self.id_buf[0..copy_len]); - self.last_event_id_len = copy_len; - } - - const evt = SseEvent{ - .id = if (self.id_len > 0) self.id_buf[0..self.id_len] else null, - .event = if (self.event_len > 0) self.event_buf[0..self.event_len] else null, - .data = if (self.data_len > 0) self.data_buf[0..self.data_len] else null, - }; - - // Clear per-event state; last_event_id and retry_ms persist. - self.event_len = 0; - self.id_len = 0; - self.data_len = 0; - return evt; + // Always update last-event-id when an `id:` line was present in + // this block, even when the value is empty (spec §9.2.6 step 1). + if (self.has_id) { + const copy_len = @min(self.id_len, self.last_event_id_buf.len); + @memcpy(self.last_event_id_buf[0..copy_len], self.id_buf[0..copy_len]); + self.last_event_id_len = copy_len; } - return null; + + // Per spec §9.2.6 step 2: do not dispatch if data buffer is empty. + const evt: ?SseEvent = if (self.data_len > 0) SseEvent{ + .id = if (self.id_len > 0) self.id_buf[0..self.id_len] else null, + .event = if (self.event_len > 0) self.event_buf[0..self.event_len] else null, + .data = self.data_buf[0..self.data_len], + } else null; + + // Clear per-event state; last_event_id and retry_ms persist. + self.event_len = 0; + self.id_len = 0; + self.has_id = false; + self.data_len = 0; + return evt; } // Lines starting with ':' are comments -- ignore per spec. if (trimmed[0] == ':') return null; - // Parse "field: value" or "field:value". - if (std.mem.indexOf(u8, trimmed, ":")) |colon_idx| { - const field = trimmed[0..colon_idx]; - var value = trimmed[colon_idx + 1 ..]; + // Parse "field: value", "field:value", or bare "field" (empty value). + // Per spec §9.2.6: a line with no colon is a field name with empty value. + const parsed = if (std.mem.indexOf(u8, trimmed, ":")) |colon_idx| blk: { + const raw_value = trimmed[colon_idx + 1 ..]; // Strip optional single leading space after colon (SSE spec §9.2.6). - if (value.len > 0 and value[0] == ' ') value = value[1..]; - - if (std.mem.eql(u8, field, "event")) { - const copy_len = @min(value.len, self.event_buf.len); - @memcpy(self.event_buf[0..copy_len], value[0..copy_len]); - self.event_len = copy_len; - } else if (std.mem.eql(u8, field, "data")) { - // Append to data buffer, joining multiple data: lines with '\n'. - if (self.data_len > 0 and self.data_len < self.data_buf.len) { - self.data_buf[self.data_len] = '\n'; - self.data_len += 1; - } - const remaining = self.data_buf.len - self.data_len; - const copy_len = @min(value.len, remaining); - @memcpy(self.data_buf[self.data_len .. self.data_len + copy_len], value[0..copy_len]); - self.data_len += copy_len; - } else if (std.mem.eql(u8, field, "id")) { - const copy_len = @min(value.len, self.id_buf.len); - @memcpy(self.id_buf[0..copy_len], value[0..copy_len]); - self.id_len = copy_len; - } else if (std.mem.eql(u8, field, "retry")) { - // Parse the retry value as a decimal integer of milliseconds. - if (std.fmt.parseInt(u64, value, 10)) |ms| { - self.retry_ms = ms; - } else |_| {} // ignore malformed retry values per spec + const v = if (raw_value.len > 0 and raw_value[0] == ' ') raw_value[1..] else raw_value; + break :blk .{ .field = trimmed[0..colon_idx], .value = v }; + } else .{ .field = trimmed, .value = @as([]const u8, "") }; + const field = parsed.field; + const value = parsed.value; + + if (std.mem.eql(u8, field, "event")) { + const copy_len = @min(value.len, self.event_buf.len); + @memcpy(self.event_buf[0..copy_len], value[0..copy_len]); + self.event_len = copy_len; + } else if (std.mem.eql(u8, field, "data")) { + // Append to data buffer, joining multiple data: lines with '\n'. + if (self.data_len > 0 and self.data_len < self.data_buf.len) { + self.data_buf[self.data_len] = '\n'; + self.data_len += 1; } + const remaining = self.data_buf.len - self.data_len; + const copy_len = @min(value.len, remaining); + @memcpy(self.data_buf[self.data_len .. self.data_len + copy_len], value[0..copy_len]); + self.data_len += copy_len; + } else if (std.mem.eql(u8, field, "id")) { + const copy_len = @min(value.len, self.id_buf.len); + @memcpy(self.id_buf[0..copy_len], value[0..copy_len]); + self.id_len = copy_len; + self.has_id = true; + } else if (std.mem.eql(u8, field, "retry")) { + // Parse the retry value as a decimal integer of milliseconds. + if (std.fmt.parseInt(u64, value, 10)) |ms| { + self.retry_ms = ms; + } else |_| {} // ignore malformed retry values per spec } return null; @@ -141,6 +149,7 @@ pub const SseParser = struct { pub fn reset(self: *SseParser) void { self.event_len = 0; self.id_len = 0; + self.has_id = false; self.data_len = 0; } }; From 1603fe4e736ec84c930da9c063eedf0dcf127051 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 13:26:02 -0400 Subject: [PATCH 4/8] Fix SSE: remove unused ConnectionFailed, dead prefix var, data_truncated flag, overflow-safe backoff, fix no-data test --- src/sse_transport.zig | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/sse_transport.zig b/src/sse_transport.zig index 9b904d1..b0cb28b 100644 --- a/src/sse_transport.zig +++ b/src/sse_transport.zig @@ -20,7 +20,6 @@ pub const SseEvent = struct { }; pub const SseError = error{ - ConnectionFailed, BadStatus, }; @@ -54,6 +53,10 @@ pub const SseParser = struct { has_id: bool = false, data_buf: [65536]u8 = undefined, data_len: usize = 0, + /// Set to true if the accumulated data for the current event exceeded + /// `data_buf`. The event is still dispatched with the truncated data. + /// Cleared on each event boundary alongside `data_len`. + data_truncated: bool = false, // Persistent state (survives event boundaries and reconnects). /// The last `id:` value seen across all events. Sent as `Last-Event-ID` @@ -98,6 +101,7 @@ pub const SseParser = struct { self.id_len = 0; self.has_id = false; self.data_len = 0; + self.data_truncated = false; return evt; } @@ -126,6 +130,7 @@ pub const SseParser = struct { self.data_len += 1; } const remaining = self.data_buf.len - self.data_len; + if (value.len > remaining) self.data_truncated = true; const copy_len = @min(value.len, remaining); @memcpy(self.data_buf[self.data_len .. self.data_len + copy_len], value[0..copy_len]); self.data_len += copy_len; @@ -151,6 +156,7 @@ pub const SseParser = struct { self.id_len = 0; self.has_id = false; self.data_len = 0; + self.data_truncated = false; } }; @@ -191,8 +197,6 @@ pub fn subscribe( var last_id_header_buf: [512 + 20]u8 = undefined; // "Last-Event-ID: " + id var id_headers: []const std.http.Header = &.{}; if (parser.lastEventId()) |last_id| { - const prefix = ""; // value is the id itself; name is the header name - _ = prefix; @memcpy(last_id_header_buf[0..last_id.len], last_id); id_headers = &.{.{ .name = "Last-Event-ID", @@ -284,7 +288,11 @@ pub fn subscribeWithReconnect( // Only advance exponential backoff when server hasn't specified retry. if (parser.retry_ms == null) { - backoff_ms = @min(backoff_ms * 2, opts.max_backoff_ms); + // Guard against overflow before clamping. + backoff_ms = if (backoff_ms > opts.max_backoff_ms / 2) + opts.max_backoff_ms + else + backoff_ms * 2; } } } @@ -366,11 +374,10 @@ test "SseParser blank line without prior data emits nothing" { } test "SseParser handles event with no data" { + // Per spec §9.2.6: if the data buffer is empty, no event is dispatched. var parser = SseParser{}; try std.testing.expect(parser.feedLine("event: heartbeat") == null); - const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("heartbeat", evt.event.?); - try std.testing.expect(evt.data == null); + try std.testing.expect(parser.feedLine("") == null); } test "SseParser handles data with no event type" { From 9fb955ee3d5e62269a6c58e30e41789feb060331 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 13:32:01 -0400 Subject: [PATCH 5/8] Add WsTransport connectWithReconnect with exponential backoff (#35) --- src/ws_transport.zig | 81 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/src/ws_transport.zig b/src/ws_transport.zig index 9bf90cc..5bb7351 100644 --- a/src/ws_transport.zig +++ b/src/ws_transport.zig @@ -644,6 +644,63 @@ pub const WsTransport = struct { } }; +// --------------------------------------------------------------------------- +// Reconnect loop +// --------------------------------------------------------------------------- + +/// Options for `connectWithReconnect`. +pub const ReconnectOpts = struct { + /// Initial backoff in milliseconds. Default: 1_000. + initial_backoff_ms: u64 = 1_000, + /// Maximum backoff cap in milliseconds. Default: 30_000. + max_backoff_ms: u64 = 30_000, + /// Optional callback invoked before each reconnect attempt. + /// Receives the backoff delay that will be applied. + on_reconnect: ?*const fn (backoff_ms: u64) void = null, +}; + +/// Connect to a WebSocket endpoint and stream messages forever, reconnecting +/// with exponential backoff on disconnection or error. +/// +/// On each successful connection `callback` is invoked with a pointer to the +/// live `WsTransport`. The callback should perform any subscription setup +/// (e.g. `eth_subscribe`) and then read messages in a loop until the +/// connection drops. When the callback returns -- whether cleanly or with an +/// error -- the transport is closed and a reconnect is scheduled. +/// +/// This function never returns under normal operation. +pub fn connectWithReconnect( + allocator: std.mem.Allocator, + url: []const u8, + opts: ReconnectOpts, + callback: *const fn (transport: *WsTransport) anyerror!void, +) void { + var backoff_ms = opts.initial_backoff_ms; + + while (true) { + if (WsTransport.connect(allocator, url)) |transport_val| { + var transport = transport_val; + defer transport.close(); + if (callback(&transport)) |_| { + // Clean close -- reset backoff. + backoff_ms = opts.initial_backoff_ms; + } else |_| {} + } else |_| {} + + if (opts.on_reconnect) |cb| cb(backoff_ms); + // Cap before converting to nanoseconds to prevent u64 overflow. + const max_sleep_ms: u64 = std.math.maxInt(u64) / std.time.ns_per_ms; + std.Thread.sleep(@min(backoff_ms, max_sleep_ms) * std.time.ns_per_ms); + + // Guard against overflow before clamping. + backoff_ms = if (backoff_ms > opts.max_backoff_ms / 2) + opts.max_backoff_ms + else + backoff_ms * 2; + } +} + + // ============================================================================ // Tests // ============================================================================ @@ -1160,3 +1217,27 @@ test "Opcode values" { try std.testing.expectEqual(@as(u4, 0xA), @intFromEnum(Opcode.pong)); try std.testing.expectEqual(@as(u4, 0x0), @intFromEnum(Opcode.continuation)); } + +test "ReconnectOpts defaults" { + const opts = ReconnectOpts{}; + try std.testing.expectEqual(@as(u64, 1_000), opts.initial_backoff_ms); + try std.testing.expectEqual(@as(u64, 30_000), opts.max_backoff_ms); + try std.testing.expect(opts.on_reconnect == null); +} + +test "ReconnectOpts backoff clamping logic" { + // Verify the overflow-safe backoff calculation used in connectWithReconnect. + const max: u64 = 30_000; + // Normal doubling + var b: u64 = 1_000; + b = if (b > max / 2) max else b * 2; + try std.testing.expectEqual(@as(u64, 2_000), b); + // Clamp when doubling would exceed max + b = 20_000; + b = if (b > max / 2) max else b * 2; + try std.testing.expectEqual(@as(u64, 30_000), b); + // Already at max + b = 30_000; + b = if (b > max / 2) max else b * 2; + try std.testing.expectEqual(@as(u64, 30_000), b); +} From 12e697e591119fae2d6527c6f6388975c6844ceb Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 13:39:16 -0400 Subject: [PATCH 6/8] SSE: has_data flag for empty data: dispatch, non-nullable SseEvent.data, overflow-safe sleep --- src/sse_transport.zig | 56 +++++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/src/sse_transport.zig b/src/sse_transport.zig index b0cb28b..452a82c 100644 --- a/src/sse_transport.zig +++ b/src/sse_transport.zig @@ -15,8 +15,10 @@ pub const SseEvent = struct { /// The `event:` field value, or null if omitted (default event type is "message"). event: ?[]const u8 = null, /// The accumulated `data:` field value. Multiple `data:` lines within one - /// event are joined with U+000A as required by the spec. - data: ?[]const u8 = null, + /// event are joined with U+000A as required by the spec. Always present + /// when the event is dispatched (may be an empty slice when the server sent + /// a bare `data:` line with no value). + data: []const u8, }; pub const SseError = error{ @@ -53,6 +55,10 @@ pub const SseParser = struct { has_id: bool = false, data_buf: [65536]u8 = undefined, data_len: usize = 0, + /// True when the current event block contained at least one `data:` line, + /// even if its value was empty (per spec, an empty `data:` still dispatches + /// an event with data = ""). + has_data: bool = false, /// Set to true if the accumulated data for the current event exceeded /// `data_buf`. The event is still dispatched with the truncated data. /// Cleared on each event boundary alongside `data_len`. @@ -75,7 +81,9 @@ pub const SseParser = struct { /// Feed a single line (without trailing `\n` or `\r\n`) to the parser. /// Returns an `SseEvent` if a blank line was encountered (event boundary) - /// AND the event contains data, or null otherwise. + /// AND at least one `data:` line was seen in this block (per spec §9.2.6), + /// or null otherwise. The returned event's `data` field may be an empty + /// slice when the server sent a bare `data:` with no value. pub fn feedLine(self: *SseParser, line: []const u8) ?SseEvent { const trimmed = std.mem.trimRight(u8, line, "\r"); @@ -89,8 +97,9 @@ pub const SseParser = struct { self.last_event_id_len = copy_len; } - // Per spec §9.2.6 step 2: do not dispatch if data buffer is empty. - const evt: ?SseEvent = if (self.data_len > 0) SseEvent{ + // Per spec §9.2.6 step 2: dispatch only when at least one data: + // line was seen (has_data). An empty-value data: line still counts. + const evt: ?SseEvent = if (self.has_data) SseEvent{ .id = if (self.id_len > 0) self.id_buf[0..self.id_len] else null, .event = if (self.event_len > 0) self.event_buf[0..self.event_len] else null, .data = self.data_buf[0..self.data_len], @@ -101,6 +110,7 @@ pub const SseParser = struct { self.id_len = 0; self.has_id = false; self.data_len = 0; + self.has_data = false; self.data_truncated = false; return evt; } @@ -125,10 +135,12 @@ pub const SseParser = struct { self.event_len = copy_len; } else if (std.mem.eql(u8, field, "data")) { // Append to data buffer, joining multiple data: lines with '\n'. - if (self.data_len > 0 and self.data_len < self.data_buf.len) { + // Mark that a data: line was seen even if the value is empty. + if (self.has_data and self.data_len < self.data_buf.len) { self.data_buf[self.data_len] = '\n'; self.data_len += 1; } + self.has_data = true; const remaining = self.data_buf.len - self.data_len; if (value.len > remaining) self.data_truncated = true; const copy_len = @min(value.len, remaining); @@ -156,6 +168,7 @@ pub const SseParser = struct { self.id_len = 0; self.has_id = false; self.data_len = 0; + self.has_data = false; self.data_truncated = false; } }; @@ -284,7 +297,9 @@ pub fn subscribeWithReconnect( // Use server-specified retry delay if available, otherwise exponential backoff. const delay = parser.retry_ms orelse backoff_ms; if (opts.on_reconnect) |cb| cb(delay); - std.Thread.sleep(delay * std.time.ns_per_ms); + // Cap before converting to nanoseconds to prevent u64 overflow. + const max_sleep_ms: u64 = std.math.maxInt(u64) / std.time.ns_per_ms; + std.Thread.sleep(@min(delay, max_sleep_ms) * std.time.ns_per_ms); // Only advance exponential backoff when server hasn't specified retry. if (parser.retry_ms == null) { @@ -307,7 +322,7 @@ test "SseParser basic event" { try std.testing.expect(parser.feedLine("data: {\"price\": 100}") == null); const evt = parser.feedLine("").?; try std.testing.expectEqualStrings("perp_price", evt.event.?); - try std.testing.expectEqualStrings("{\"price\": 100}", evt.data.?); + try std.testing.expectEqualStrings("{\"price\": 100}", evt.data); try std.testing.expect(evt.id == null); } @@ -319,7 +334,7 @@ test "SseParser captures id field" { const evt = parser.feedLine("").?; try std.testing.expectEqualStrings("42", evt.id.?); try std.testing.expectEqualStrings("update", evt.event.?); - try std.testing.expectEqualStrings("hello", evt.data.?); + try std.testing.expectEqualStrings("hello", evt.data); } test "SseParser persists last_event_id across events" { @@ -353,7 +368,7 @@ test "SseParser appends multiple data lines with newline" { try std.testing.expect(parser.feedLine("data: first") == null); try std.testing.expect(parser.feedLine("data: second") == null); const evt = parser.feedLine("").?; - try std.testing.expectEqualStrings("first\nsecond", evt.data.?); + try std.testing.expectEqualStrings("first\nsecond", evt.data); } test "SseParser ignores comments" { @@ -364,7 +379,7 @@ test "SseParser ignores comments" { try std.testing.expect(parser.feedLine("data: hello") == null); const evt = parser.feedLine("").?; try std.testing.expectEqualStrings("test", evt.event.?); - try std.testing.expectEqualStrings("hello", evt.data.?); + try std.testing.expectEqualStrings("hello", evt.data); } test "SseParser blank line without prior data emits nothing" { @@ -373,6 +388,17 @@ test "SseParser blank line without prior data emits nothing" { try std.testing.expect(parser.feedLine("") == null); } +test "SseParser dispatches event with empty data: line" { + // Per spec: a bare `data:` (empty value) counts as a data: line and + // must trigger dispatch with an empty data slice. + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: ping") == null); + try std.testing.expect(parser.feedLine("data:") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("ping", evt.event.?); + try std.testing.expectEqualStrings("", evt.data); +} + test "SseParser handles event with no data" { // Per spec §9.2.6: if the data buffer is empty, no event is dispatched. var parser = SseParser{}; @@ -385,7 +411,7 @@ test "SseParser handles data with no event type" { try std.testing.expect(parser.feedLine("data: orphan") == null); const evt = parser.feedLine("").?; try std.testing.expect(evt.event == null); - try std.testing.expectEqualStrings("orphan", evt.data.?); + try std.testing.expectEqualStrings("orphan", evt.data); } test "SseParser resets per-event state but not last_event_id" { @@ -406,7 +432,7 @@ test "SseParser handles carriage return in line" { try std.testing.expect(parser.feedLine("data: value\r") == null); const evt = parser.feedLine("").?; try std.testing.expectEqualStrings("test", evt.event.?); - try std.testing.expectEqualStrings("value", evt.data.?); + try std.testing.expectEqualStrings("value", evt.data); } test "SseParser colon without space" { @@ -415,7 +441,7 @@ test "SseParser colon without space" { try std.testing.expect(parser.feedLine("data:also_no_space") == null); const evt = parser.feedLine("").?; try std.testing.expectEqualStrings("no_space", evt.event.?); - try std.testing.expectEqualStrings("also_no_space", evt.data.?); + try std.testing.expectEqualStrings("also_no_space", evt.data); } test "SseParser multiple events in sequence" { @@ -437,5 +463,5 @@ test "SseParser ignores unknown fields" { _ = parser.feedLine("data: test_data"); const evt = parser.feedLine("").?; try std.testing.expectEqualStrings("perp_price", evt.event.?); - try std.testing.expectEqualStrings("test_data", evt.data.?); + try std.testing.expectEqualStrings("test_data", evt.data); } From c4493ef14d9d7e2eb6d1a63b0aa7e194322e8c37 Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 13:46:29 -0400 Subject: [PATCH 7/8] Fix formatting: remove extra blank line in ws_transport.zig --- src/ws_transport.zig | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ws_transport.zig b/src/ws_transport.zig index 5bb7351..d63ae0c 100644 --- a/src/ws_transport.zig +++ b/src/ws_transport.zig @@ -700,7 +700,6 @@ pub fn connectWithReconnect( } } - // ============================================================================ // Tests // ============================================================================ From aa73feefa12a065843d8a8f49c3fca6e956c00af Mon Sep 17 00:00:00 2001 From: Amlandeep Bhadra Date: Sun, 15 Mar 2026 14:25:56 -0400 Subject: [PATCH 8/8] Fix ev2.data.? residual nullable unwrap in test --- src/sse_transport.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sse_transport.zig b/src/sse_transport.zig index 452a82c..d7d05a8 100644 --- a/src/sse_transport.zig +++ b/src/sse_transport.zig @@ -454,7 +454,7 @@ test "SseParser multiple events in sequence" { _ = parser.feedLine("data: 2"); const ev2 = parser.feedLine("").?; try std.testing.expectEqualStrings("second", ev2.event.?); - try std.testing.expectEqualStrings("2", ev2.data.?); + try std.testing.expectEqualStrings("2", ev2.data); } test "SseParser ignores unknown fields" {