diff --git a/README.md b/README.md index 51ed6bf..fbd9047 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,23 @@ const key = try eth.hd_wallet.deriveEthAccount(seed, 0); const addr = key.toAddress(); ``` +## Built with eth.zig + +Real production bots and SDKs built on eth.zig: + +| Project | Description | Language | +|---------|-------------|----------| +| [perpcity-zig-sdk](https://github.com/StrobeLabs/perpcity-zig-sdk) | High-performance SDK for the PerpCity perpetual futures protocol on Base. Comptime ABI encoding, lock-free nonce management, 2-tier price cache. | Zig | +| [gator-liquidators](https://github.com/StrobeLabs/gator-liquidators) | Production liquidation keeper for PerpCity. Batch-checks positions via Multicall3, pre-signs liquidation txs, submits to Base sequencer. | Zig | + +```text +eth.zig + └── perpcity-zig-sdk (protocol SDK with comptime contract ABIs) + └── gator-liquidators (production liquidation bot on Base) +``` + +Built something with eth.zig? Open a PR to add it here. + ## Installation **One-liner:** @@ -176,7 +193,7 @@ cd examples && zig build && ./zig-out/bin/01_derive_address | **Crypto** | `secp256k1`, `signer`, `signature`, `keccak`, `eip155` | ECDSA signing (RFC 6979), Keccak-256, EIP-155 | | **Types** | `transaction`, `receipt`, `block`, `blob`, `access_list` | Legacy, EIP-2930, EIP-1559, EIP-4844 transactions | | **Accounts** | `mnemonic`, `hd_wallet` | BIP-32/39/44 HD wallets and mnemonic generation | -| **Transport** | `http_transport`, `ws_transport`, `json_rpc`, `provider`, `subscription` | HTTP and WebSocket JSON-RPC transports | +| **Transport** | `http_transport`, `ws_transport`, `sse_transport`, `json_rpc`, `provider`, `subscription` | HTTP, WebSocket, and SSE transports | | **ENS** | `ens_namehash`, `ens_resolver`, `ens_reverse` | ENS name resolution and reverse lookup | | **Client** | `wallet`, `contract`, `multicall`, `event`, `erc20`, `erc721` | Signing wallet, contract interaction, Multicall3, token wrappers | | **Standards** | `eip712`, `abi_json` | EIP-712 typed data signing, Solidity JSON ABI parsing | diff --git a/src/root.zig b/src/root.zig index 7c33bea..41f7f49 100644 --- a/src/root.zig +++ b/src/root.zig @@ -34,6 +34,7 @@ pub const hd_wallet = @import("hd_wallet.zig"); pub const json_rpc = @import("json_rpc.zig"); pub const http_transport = @import("http_transport.zig"); pub const ws_transport = @import("ws_transport.zig"); +pub const sse_transport = @import("sse_transport.zig"); pub const subscription = @import("subscription.zig"); pub const provider = @import("provider.zig"); pub const retry_provider = @import("retry_provider.zig"); @@ -106,6 +107,7 @@ test { _ = @import("json_rpc.zig"); _ = @import("http_transport.zig"); _ = @import("ws_transport.zig"); + _ = @import("sse_transport.zig"); _ = @import("subscription.zig"); _ = @import("provider.zig"); _ = @import("retry_provider.zig"); diff --git a/src/sse_transport.zig b/src/sse_transport.zig new file mode 100644 index 0000000..d7d05a8 --- /dev/null +++ b/src/sse_transport.zig @@ -0,0 +1,467 @@ +const std = @import("std"); + +// ============================================================================ +// Types +// ============================================================================ + +/// A parsed SSE event as defined by the W3C Server-Sent Events specification. +/// +/// All slice fields point into buffers owned by the `SseParser` that produced +/// this event. They are valid until the next call to `SseParser.feedLine` or +/// `SseParser.reset`. +pub const SseEvent = struct { + /// The `id:` field value, or null if omitted. + id: ?[]const u8 = null, + /// The `event:` field value, or null if omitted (default event type is "message"). + event: ?[]const u8 = null, + /// The accumulated `data:` field value. Multiple `data:` lines within one + /// event are joined with U+000A as required by the spec. Always present + /// when the event is dispatched (may be an empty slice when the server sent + /// a bare `data:` line with no value). + data: []const u8, +}; + +pub const SseError = error{ + BadStatus, +}; + +// ============================================================================ +// Parser +// ============================================================================ + +/// Line-oriented SSE parser. +/// +/// Feed lines one at a time via `feedLine`. The parser accumulates `event:`, +/// `id:`, and `data:` fields and emits an `SseEvent` when it encounters a +/// blank line (the event boundary per the SSE spec). +/// +/// All field values are copied into fixed internal buffers, so emitted +/// `SseEvent` slices remain valid until the next call to `feedLine` or +/// `reset` -- regardless of whether the caller's line buffer has been reused. +/// +/// `last_event_id` and `retry_ms` persist across events and are intended for +/// use by reconnecting transports. +/// +/// Designed to be testable without any network I/O. +pub const SseParser = struct { + // Per-event buffers (cleared on event dispatch). + event_buf: [256]u8 = undefined, + event_len: usize = 0, + id_buf: [512]u8 = undefined, + id_len: usize = 0, + /// True when the current event block contained at least one `id:` line, + /// even if its value was empty. Distinguishes "id seen with empty value" + /// (which clears last_event_id) from "id not present" (no change). + has_id: bool = false, + data_buf: [65536]u8 = undefined, + data_len: usize = 0, + /// True when the current event block contained at least one `data:` line, + /// even if its value was empty (per spec, an empty `data:` still dispatches + /// an event with data = ""). + has_data: bool = false, + /// Set to true if the accumulated data for the current event exceeded + /// `data_buf`. The event is still dispatched with the truncated data. + /// Cleared on each event boundary alongside `data_len`. + data_truncated: bool = false, + + // Persistent state (survives event boundaries and reconnects). + /// The last `id:` value seen across all events. Sent as `Last-Event-ID` + /// on reconnect. Empty slice means no id has been received yet. + last_event_id_buf: [512]u8 = undefined, + last_event_id_len: usize = 0, + /// Server-specified reconnect delay in milliseconds (`retry:` field). + /// Null means the server has not specified a value. + retry_ms: ?u64 = null, + + /// Return the last received event id, or null if none has been seen. + pub fn lastEventId(self: *const SseParser) ?[]const u8 { + if (self.last_event_id_len == 0) return null; + return self.last_event_id_buf[0..self.last_event_id_len]; + } + + /// Feed a single line (without trailing `\n` or `\r\n`) to the parser. + /// Returns an `SseEvent` if a blank line was encountered (event boundary) + /// AND at least one `data:` line was seen in this block (per spec §9.2.6), + /// or null otherwise. The returned event's `data` field may be an empty + /// slice when the server sent a bare `data:` with no value. + pub fn feedLine(self: *SseParser, line: []const u8) ?SseEvent { + const trimmed = std.mem.trimRight(u8, line, "\r"); + + // Blank line = event boundary. + if (trimmed.len == 0) { + // Always update last-event-id when an `id:` line was present in + // this block, even when the value is empty (spec §9.2.6 step 1). + if (self.has_id) { + const copy_len = @min(self.id_len, self.last_event_id_buf.len); + @memcpy(self.last_event_id_buf[0..copy_len], self.id_buf[0..copy_len]); + self.last_event_id_len = copy_len; + } + + // Per spec §9.2.6 step 2: dispatch only when at least one data: + // line was seen (has_data). An empty-value data: line still counts. + const evt: ?SseEvent = if (self.has_data) SseEvent{ + .id = if (self.id_len > 0) self.id_buf[0..self.id_len] else null, + .event = if (self.event_len > 0) self.event_buf[0..self.event_len] else null, + .data = self.data_buf[0..self.data_len], + } else null; + + // Clear per-event state; last_event_id and retry_ms persist. + self.event_len = 0; + self.id_len = 0; + self.has_id = false; + self.data_len = 0; + self.has_data = false; + self.data_truncated = false; + return evt; + } + + // Lines starting with ':' are comments -- ignore per spec. + if (trimmed[0] == ':') return null; + + // Parse "field: value", "field:value", or bare "field" (empty value). + // Per spec §9.2.6: a line with no colon is a field name with empty value. + const parsed = if (std.mem.indexOf(u8, trimmed, ":")) |colon_idx| blk: { + const raw_value = trimmed[colon_idx + 1 ..]; + // Strip optional single leading space after colon (SSE spec §9.2.6). + const v = if (raw_value.len > 0 and raw_value[0] == ' ') raw_value[1..] else raw_value; + break :blk .{ .field = trimmed[0..colon_idx], .value = v }; + } else .{ .field = trimmed, .value = @as([]const u8, "") }; + const field = parsed.field; + const value = parsed.value; + + if (std.mem.eql(u8, field, "event")) { + const copy_len = @min(value.len, self.event_buf.len); + @memcpy(self.event_buf[0..copy_len], value[0..copy_len]); + self.event_len = copy_len; + } else if (std.mem.eql(u8, field, "data")) { + // Append to data buffer, joining multiple data: lines with '\n'. + // Mark that a data: line was seen even if the value is empty. + if (self.has_data and self.data_len < self.data_buf.len) { + self.data_buf[self.data_len] = '\n'; + self.data_len += 1; + } + self.has_data = true; + const remaining = self.data_buf.len - self.data_len; + if (value.len > remaining) self.data_truncated = true; + const copy_len = @min(value.len, remaining); + @memcpy(self.data_buf[self.data_len .. self.data_len + copy_len], value[0..copy_len]); + self.data_len += copy_len; + } else if (std.mem.eql(u8, field, "id")) { + const copy_len = @min(value.len, self.id_buf.len); + @memcpy(self.id_buf[0..copy_len], value[0..copy_len]); + self.id_len = copy_len; + self.has_id = true; + } else if (std.mem.eql(u8, field, "retry")) { + // Parse the retry value as a decimal integer of milliseconds. + if (std.fmt.parseInt(u64, value, 10)) |ms| { + self.retry_ms = ms; + } else |_| {} // ignore malformed retry values per spec + } + + return null; + } + + /// Reset per-event accumulated state. Does NOT clear `last_event_id` or + /// `retry_ms` -- those are persistent and survive reconnects. + pub fn reset(self: *SseParser) void { + self.event_len = 0; + self.id_len = 0; + self.has_id = false; + self.data_len = 0; + self.has_data = false; + self.data_truncated = false; + } +}; + +// ============================================================================ +// Transport +// ============================================================================ + +/// Connect to an SSE endpoint and call `callback` for each event until the +/// connection closes or an error occurs. +/// +/// This function makes a single HTTP request and streams events until EOF. +/// It does NOT reconnect -- wrap this in a loop with exponential backoff for +/// production use (see `subscribeWithReconnect`). +/// +/// `extra_headers` are appended after the required `Accept` and `Cache-Control` +/// headers. The caller is responsible for any authentication headers. +/// +/// `parser` is caller-supplied so that `last_event_id` and `retry_ms` persist +/// across reconnects when used with `subscribeWithReconnect`. +pub fn subscribe( + allocator: std.mem.Allocator, + url: []const u8, + extra_headers: []const std.http.Header, + parser: *SseParser, + callback: *const fn (event: SseEvent) void, +) !void { + var client = std.http.Client{ .allocator = allocator }; + defer client.deinit(); + + const uri = try std.Uri.parse(url); + + // Build header list: base SSE headers + Last-Event-ID (if any) + caller extras. + const base_headers: []const std.http.Header = &.{ + .{ .name = "Accept", .value = "text/event-stream" }, + .{ .name = "Cache-Control", .value = "no-cache" }, + }; + + var last_id_header_buf: [512 + 20]u8 = undefined; // "Last-Event-ID: " + id + var id_headers: []const std.http.Header = &.{}; + if (parser.lastEventId()) |last_id| { + @memcpy(last_id_header_buf[0..last_id.len], last_id); + id_headers = &.{.{ + .name = "Last-Event-ID", + .value = last_id_header_buf[0..last_id.len], + }}; + } + + const all_headers = try std.mem.concat( + allocator, + std.http.Header, + &.{ base_headers, id_headers, extra_headers }, + ); + defer allocator.free(all_headers); + + var req = try client.request(.GET, uri, .{ .extra_headers = all_headers }); + defer req.deinit(); + + try req.sendBodiless(); + + var redirect_buf: [4096]u8 = undefined; + var response = try req.receiveHead(&redirect_buf); + + if (response.head.status != .ok) { + return error.BadStatus; + } + + // Reset per-event state but preserve last_event_id and retry_ms. + parser.reset(); + + var transfer_buf: [8192]u8 = undefined; + const reader = response.reader(&transfer_buf); + + while (true) { + const line_with_nl = reader.takeDelimiterInclusive('\n') catch |err| switch (err) { + error.EndOfStream => return, // normal close + else => return err, + }; + const line = line_with_nl[0 .. line_with_nl.len - 1]; + + if (parser.feedLine(line)) |evt| { + callback(evt); + } + } +} + +/// Options for `subscribeWithReconnect`. +pub const ReconnectOpts = struct { + /// Initial backoff in milliseconds. Overridden by the server's `retry:` value + /// if one has been received. Default: 1_000. + initial_backoff_ms: u64 = 1_000, + /// Maximum backoff cap in milliseconds. Default: 30_000. + max_backoff_ms: u64 = 30_000, + /// Optional callback invoked before each reconnect attempt. + /// Receives the backoff delay that will be applied. + on_reconnect: ?*const fn (backoff_ms: u64) void = null, +}; + +/// Connect to an SSE endpoint and stream events forever, reconnecting with +/// exponential backoff on disconnection or error. +/// +/// `Last-Event-ID` is automatically sent on reconnect if the server has +/// previously sent an `id:` field. The reconnect delay respects the server's +/// `retry:` value when present. +/// +/// This function never returns under normal operation. The caller's thread +/// will be blocked here. +pub fn subscribeWithReconnect( + allocator: std.mem.Allocator, + url: []const u8, + extra_headers: []const std.http.Header, + opts: ReconnectOpts, + callback: *const fn (event: SseEvent) void, +) void { + // One parser instance shared across reconnects so last_event_id and + // retry_ms survive disconnections. + var parser = SseParser{}; + var backoff_ms = opts.initial_backoff_ms; + + while (true) { + if (subscribe(allocator, url, extra_headers, &parser, callback)) |_| { + // Clean close -- reset backoff. + backoff_ms = opts.initial_backoff_ms; + } else |_| {} + + // Use server-specified retry delay if available, otherwise exponential backoff. + const delay = parser.retry_ms orelse backoff_ms; + if (opts.on_reconnect) |cb| cb(delay); + // Cap before converting to nanoseconds to prevent u64 overflow. + const max_sleep_ms: u64 = std.math.maxInt(u64) / std.time.ns_per_ms; + std.Thread.sleep(@min(delay, max_sleep_ms) * std.time.ns_per_ms); + + // Only advance exponential backoff when server hasn't specified retry. + if (parser.retry_ms == null) { + // Guard against overflow before clamping. + backoff_ms = if (backoff_ms > opts.max_backoff_ms / 2) + opts.max_backoff_ms + else + backoff_ms * 2; + } + } +} + +// ============================================================================ +// Tests +// ============================================================================ + +test "SseParser basic event" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: perp_price") == null); + try std.testing.expect(parser.feedLine("data: {\"price\": 100}") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("perp_price", evt.event.?); + try std.testing.expectEqualStrings("{\"price\": 100}", evt.data); + try std.testing.expect(evt.id == null); +} + +test "SseParser captures id field" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("id: 42") == null); + try std.testing.expect(parser.feedLine("event: update") == null); + try std.testing.expect(parser.feedLine("data: hello") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("42", evt.id.?); + try std.testing.expectEqualStrings("update", evt.event.?); + try std.testing.expectEqualStrings("hello", evt.data); +} + +test "SseParser persists last_event_id across events" { + var parser = SseParser{}; + _ = parser.feedLine("id: 7"); + _ = parser.feedLine("data: first"); + _ = parser.feedLine(""); + try std.testing.expectEqualStrings("7", parser.lastEventId().?); + + // Next event without id -- last_event_id should remain "7". + _ = parser.feedLine("data: second"); + _ = parser.feedLine(""); + try std.testing.expectEqualStrings("7", parser.lastEventId().?); +} + +test "SseParser parses retry field" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("retry: 3000") == null); + try std.testing.expect(parser.retry_ms.? == 3000); +} + +test "SseParser ignores malformed retry value" { + var parser = SseParser{}; + _ = parser.feedLine("retry: not_a_number"); + try std.testing.expect(parser.retry_ms == null); +} + +test "SseParser appends multiple data lines with newline" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: test") == null); + try std.testing.expect(parser.feedLine("data: first") == null); + try std.testing.expect(parser.feedLine("data: second") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("first\nsecond", evt.data); +} + +test "SseParser ignores comments" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine(": this is a comment") == null); + try std.testing.expect(parser.feedLine("event: test") == null); + try std.testing.expect(parser.feedLine(": another comment") == null); + try std.testing.expect(parser.feedLine("data: hello") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("test", evt.event.?); + try std.testing.expectEqualStrings("hello", evt.data); +} + +test "SseParser blank line without prior data emits nothing" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("") == null); + try std.testing.expect(parser.feedLine("") == null); +} + +test "SseParser dispatches event with empty data: line" { + // Per spec: a bare `data:` (empty value) counts as a data: line and + // must trigger dispatch with an empty data slice. + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: ping") == null); + try std.testing.expect(parser.feedLine("data:") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("ping", evt.event.?); + try std.testing.expectEqualStrings("", evt.data); +} + +test "SseParser handles event with no data" { + // Per spec §9.2.6: if the data buffer is empty, no event is dispatched. + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: heartbeat") == null); + try std.testing.expect(parser.feedLine("") == null); +} + +test "SseParser handles data with no event type" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("data: orphan") == null); + const evt = parser.feedLine("").?; + try std.testing.expect(evt.event == null); + try std.testing.expectEqualStrings("orphan", evt.data); +} + +test "SseParser resets per-event state but not last_event_id" { + var parser = SseParser{}; + _ = parser.feedLine("id: 99"); + _ = parser.feedLine("event: test"); + parser.reset(); + // Per-event fields cleared. + try std.testing.expect(parser.feedLine("") == null); + // last_event_id is NOT cleared by reset. + // (It's updated on dispatch, not on reset, so still 0 here.) + try std.testing.expect(parser.lastEventId() == null); +} + +test "SseParser handles carriage return in line" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event: test\r") == null); + try std.testing.expect(parser.feedLine("data: value\r") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("test", evt.event.?); + try std.testing.expectEqualStrings("value", evt.data); +} + +test "SseParser colon without space" { + var parser = SseParser{}; + try std.testing.expect(parser.feedLine("event:no_space") == null); + try std.testing.expect(parser.feedLine("data:also_no_space") == null); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("no_space", evt.event.?); + try std.testing.expectEqualStrings("also_no_space", evt.data); +} + +test "SseParser multiple events in sequence" { + var parser = SseParser{}; + _ = parser.feedLine("event: first"); + _ = parser.feedLine("data: 1"); + const ev1 = parser.feedLine("").?; + try std.testing.expectEqualStrings("first", ev1.event.?); + _ = parser.feedLine("event: second"); + _ = parser.feedLine("data: 2"); + const ev2 = parser.feedLine("").?; + try std.testing.expectEqualStrings("second", ev2.event.?); + try std.testing.expectEqualStrings("2", ev2.data); +} + +test "SseParser ignores unknown fields" { + var parser = SseParser{}; + _ = parser.feedLine("event: perp_price"); + _ = parser.feedLine("data: test_data"); + const evt = parser.feedLine("").?; + try std.testing.expectEqualStrings("perp_price", evt.event.?); + try std.testing.expectEqualStrings("test_data", evt.data); +} diff --git a/src/ws_transport.zig b/src/ws_transport.zig index 9bf90cc..d63ae0c 100644 --- a/src/ws_transport.zig +++ b/src/ws_transport.zig @@ -644,6 +644,62 @@ pub const WsTransport = struct { } }; +// --------------------------------------------------------------------------- +// Reconnect loop +// --------------------------------------------------------------------------- + +/// Options for `connectWithReconnect`. +pub const ReconnectOpts = struct { + /// Initial backoff in milliseconds. Default: 1_000. + initial_backoff_ms: u64 = 1_000, + /// Maximum backoff cap in milliseconds. Default: 30_000. + max_backoff_ms: u64 = 30_000, + /// Optional callback invoked before each reconnect attempt. + /// Receives the backoff delay that will be applied. + on_reconnect: ?*const fn (backoff_ms: u64) void = null, +}; + +/// Connect to a WebSocket endpoint and stream messages forever, reconnecting +/// with exponential backoff on disconnection or error. +/// +/// On each successful connection `callback` is invoked with a pointer to the +/// live `WsTransport`. The callback should perform any subscription setup +/// (e.g. `eth_subscribe`) and then read messages in a loop until the +/// connection drops. When the callback returns -- whether cleanly or with an +/// error -- the transport is closed and a reconnect is scheduled. +/// +/// This function never returns under normal operation. +pub fn connectWithReconnect( + allocator: std.mem.Allocator, + url: []const u8, + opts: ReconnectOpts, + callback: *const fn (transport: *WsTransport) anyerror!void, +) void { + var backoff_ms = opts.initial_backoff_ms; + + while (true) { + if (WsTransport.connect(allocator, url)) |transport_val| { + var transport = transport_val; + defer transport.close(); + if (callback(&transport)) |_| { + // Clean close -- reset backoff. + backoff_ms = opts.initial_backoff_ms; + } else |_| {} + } else |_| {} + + if (opts.on_reconnect) |cb| cb(backoff_ms); + // Cap before converting to nanoseconds to prevent u64 overflow. + const max_sleep_ms: u64 = std.math.maxInt(u64) / std.time.ns_per_ms; + std.Thread.sleep(@min(backoff_ms, max_sleep_ms) * std.time.ns_per_ms); + + // Guard against overflow before clamping. + backoff_ms = if (backoff_ms > opts.max_backoff_ms / 2) + opts.max_backoff_ms + else + backoff_ms * 2; + } +} + // ============================================================================ // Tests // ============================================================================ @@ -1160,3 +1216,27 @@ test "Opcode values" { try std.testing.expectEqual(@as(u4, 0xA), @intFromEnum(Opcode.pong)); try std.testing.expectEqual(@as(u4, 0x0), @intFromEnum(Opcode.continuation)); } + +test "ReconnectOpts defaults" { + const opts = ReconnectOpts{}; + try std.testing.expectEqual(@as(u64, 1_000), opts.initial_backoff_ms); + try std.testing.expectEqual(@as(u64, 30_000), opts.max_backoff_ms); + try std.testing.expect(opts.on_reconnect == null); +} + +test "ReconnectOpts backoff clamping logic" { + // Verify the overflow-safe backoff calculation used in connectWithReconnect. + const max: u64 = 30_000; + // Normal doubling + var b: u64 = 1_000; + b = if (b > max / 2) max else b * 2; + try std.testing.expectEqual(@as(u64, 2_000), b); + // Clamp when doubling would exceed max + b = 20_000; + b = if (b > max / 2) max else b * 2; + try std.testing.expectEqual(@as(u64, 30_000), b); + // Already at max + b = 30_000; + b = if (b > max / 2) max else b * 2; + try std.testing.expectEqual(@as(u64, 30_000), b); +}