diff --git a/README.md b/README.md index bc6d8e76..c1b1fd65 100644 --- a/README.md +++ b/README.md @@ -200,6 +200,9 @@ The `toString` method uses `toHex`. | `pssSend` | `POST /pss/send/:topic/:target` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1send~1%7Btopic%7D~1%7Btargets%7D/post) | ❌✅✅ | | `pssSubscribe` _Websocket_ | `GET /pss/subscribe/:topic` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1subscribe~1%7Btopic%7D/get) | ❌❌✅ | | `pssReceive` | `GET /pss/subscribe/:topic` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1subscribe~1%7Btopic%7D/get) | ❌❌✅ | +| `gsocSubscribe` _WebSocket_ | `GET /gsoc/subscribe/:address` | ❌❌✅ | +| `pubsubConnect` _WebSocket_ | `GET /pubsub/:topicAddress` | ❌❌✅ | +| `listPubsubTopics` | `GET /pubsub/` | ❌❌✅ | | `getPostageBatches` | `GET /stamps` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1stamps/get) | ❌✅✅ | | `getGlobalPostageBatches` | `GET /batches` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1batches/get) | ❌✅✅ | | `getPostageBatch` | `GET /stamps/:batchId` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1stamps~1%7Bbatch_id%7D/get) | ❌✅✅ | diff --git a/src/bee.ts b/src/bee.ts index a2eebf16..c9bf2e6d 100644 --- a/src/bee.ts +++ b/src/bee.ts @@ -28,6 +28,8 @@ import { FeedPayloadResult, createFeedManifest, fetchLatestFeedUpdate } from './ import * as grantee from './modules/grantee' import * as gsoc from './modules/gsoc' import * as pinning from './modules/pinning' +import * as pubsub from './modules/pubsub' +import type { PubsubModeParams } from './modules/pubsub' import * as pss from './modules/pss' import { rchash } from './modules/rchash' import * as status from './modules/status' @@ -56,6 +58,10 @@ import type { GsocMessageHandler, GsocSubscription, Health, + PubsubMessageHandler, + PubsubMode, + PubsubSubscription, + PubsubTopicListResponse, LastCashoutActionResponse, LastChequesForPeerResponse, LastChequesResponse, @@ -1287,6 +1293,144 @@ export class Bee { return subscription } + /** + * Connects to a pubsub topic via WebSocket, acting as a publisher (read + write). + * + * The mode enum and its constructor arguments are passed directly + * + * @param mode Pubsub mode enum value (e.g. `PubsubMode.GSOC_EPHEMERAL`) + * @param handler Message handler with `onMessage`, `onError`, `onClose` callbacks + * @param brokerPeer Multiaddress of the broker peer to connect to + * @param modeParams Constructor arguments for the selected mode (topic, optional params) + * @returns A {@link PubsubSubscription} with `send(payload)` and `cancel()` methods + */ + pubsubConnect( + mode: M, + handler: PubsubMessageHandler, + brokerPeer: string, + ...modeParams: PubsubModeParams[M] + ): PubsubSubscription { + const modeInstance = pubsub.createPubsubMode(mode, ...modeParams) + + const ws = pubsub.connect( + this.url, + modeInstance.topicAddress, + brokerPeer, + modeInstance.getPublisherHeaders() ?? undefined, + this.requestOptions.headers, + ) + // Ensure binary frames are delivered as ArrayBuffer (not Blob) in browser environments. + // prepareWebsocketData handles ArrayBuffer but not Blob. + ws.binaryType = 'arraybuffer' + + const PING_INTERVAL_MS = 50_000 + let pingTimer: ReturnType | null = null + + const startPing = () => { + if (typeof ws.ping === 'function') { + pingTimer = setInterval(() => { + try { + ws.ping() + } catch { + // ignore errors on closed sockets + } + }, PING_INTERVAL_MS) + } + } + + const stopPing = () => { + if (pingTimer !== null) { + clearInterval(pingTimer) + pingTimer = null + } + } + + let cancelled = false + const cancel = () => { + if (!cancelled) { + cancelled = true + stopPing() + + if (ws.terminate) { + ws.terminate() + } else { + ws.close() + } + } + } + + let ready = false + const sendQueue: Uint8Array[] = [] + + const flushQueue = () => { + ready = true + + for (const msg of sendQueue) { + ws.send(msg) + } + sendQueue.length = 0 + } + + const subscription: PubsubSubscription = { + cancel, + send: async (payload: Uint8Array | string): Promise => { + const encoded = await modeInstance.encodeMessage(payload) + + if (ready) { + ws.send(encoded) + } else { + sendQueue.push(encoded) + } + }, + } + + ws.onopen = () => { + if (cancelled) { + ws.close() + + return + } + + startPing() + flushQueue() + + if (handler.onOpen) { + handler.onOpen(subscription) + } + } + + ws.onmessage = async event => { + const data = await prepareWebsocketData(event.data) + + if (data.length) { + handler.onMessage(modeInstance.decodeMessage(data), subscription) + } + } + ws.onerror = event => { + if (!cancelled) { + handler.onError(new BeeError(event.message), subscription) + } + } + ws.onclose = () => { + stopPing() + + if (!cancelled) { + handler.onClose(subscription) + } + } + + return subscription + } + + /** + * Lists all active pubsub topics this node is participating in. + * + * @param requestOptions Options for making requests, such as timeouts, custom HTTP agents, headers, etc. + */ + async listPubsubTopics(requestOptions?: BeeRequestOptions): Promise { + return pubsub.listTopics(this.getRequestOptionsForCall(requestOptions)) + } + /** * Creates a feed manifest chunk and returns the reference to it. * diff --git a/src/index.ts b/src/index.ts index c102c51f..23a06910 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,8 @@ import { Stamper } from './stamper/stamper' export { MerkleTree } from 'cafe-utility' export type { Chunk } from './chunk/cac' export type { SingleOwnerChunk } from './chunk/soc' +export { GsocEphemeralMode, createPubsubMode } from './modules/pubsub' +export type { PubsubModeParams, PubsubModeInstance } from './modules/pubsub' export { MantarayNode } from './manifest/manifest' export { SUPPORTED_BEE_VERSION, SUPPORTED_BEE_VERSION_EXACT } from './modules/debug/status' export * from './types' diff --git a/src/modules/pubsub.ts b/src/modules/pubsub.ts new file mode 100644 index 00000000..8f9766fe --- /dev/null +++ b/src/modules/pubsub.ts @@ -0,0 +1,184 @@ +import { Binary, Types } from 'cafe-utility' +import WebSocket from 'isomorphic-ws' +import { makeContentAddressedChunk } from '../chunk/cac' +import { makeSOCAddress } from '../chunk/soc' +import type { BeeRequestOptions } from '../types' +import { GsocEphemeralParams, PubsubMode, PubsubTopicListResponse } from '../types' +import { Bytes } from '../utils/bytes' +import { http } from '../utils/http' +import { EthAddress, Identifier, PrivateKey, Signature, Span } from '../utils/typed-bytes' +import { NULL_IDENTIFIER } from '../utils/constants' + +const endpoint = 'pubsub' +const ENCODER = new TextEncoder() + +const SIG_SIZE = Signature.LENGTH +const SPAN_WS_SIZE = Span.LENGTH + +export interface IPubsubMode { + readonly topicAddress: string + getPublisherHeaders(): Record | null + encodeMessage(payload: Uint8Array | string): Promise + decodeMessage(frame: Uint8Array): Bytes +} + +export class GsocEphemeralMode implements IPubsubMode { + readonly topicAddress: string + private readonly socIdentifier: Identifier + private readonly privateKey: PrivateKey | null + private readonly externalAddressHex: string | null + private readonly signFn: ((data: Uint8Array) => Signature | Promise) | null + + constructor(params: GsocEphemeralParams) { + if (params.socId !== undefined) { + if (typeof params.socId === 'string') { + this.socIdentifier = new Identifier(Binary.keccak256(ENCODER.encode(params.socId))) + } else { + this.socIdentifier = new Identifier(params.socId) + } + } else { + this.socIdentifier = new Identifier(NULL_IDENTIFIER) + } + + if ('address' in params && params.address && params.signFn) { + // External signer path — can publish + this.privateKey = null + const signerAddress = new EthAddress(params.address) + this.externalAddressHex = Binary.uint8ArrayToHex(signerAddress.toUint8Array()) + this.signFn = params.signFn + this.topicAddress = makeSOCAddress(this.socIdentifier, signerAddress).toHex() + } else if ('topic' in params && params.topic !== undefined) { + // Ephemeral signer path — can publish + if (typeof params.topic === 'string') { + this.privateKey = new PrivateKey(Binary.keccak256(ENCODER.encode(params.topic))) + } else { + this.privateKey = new PrivateKey(params.topic) + } + const signerAddress = this.privateKey.publicKey().address() + this.externalAddressHex = null + this.signFn = null + this.topicAddress = makeSOCAddress(this.socIdentifier, signerAddress).toHex() + } else { + // Subscriber-only path — read-only, topicAddress provided directly + this.privateKey = null + this.externalAddressHex = null + this.signFn = null + this.topicAddress = params.topicAddress + } + } + + getPublisherHeaders(): Record | null { + if (!this.privateKey && !this.externalAddressHex) { + return null + } + + const signerHex = this.privateKey + ? Binary.uint8ArrayToHex(this.privateKey.publicKey().address().toUint8Array()) + : this.externalAddressHex! + + return { + 'swarm-pubsub-gsoc-eth-address': signerHex, + 'swarm-pubsub-gsoc-topic': this.socIdentifier.toHex(), + } + } + + async encodeMessage(payload: Uint8Array | string): Promise { + const rawPayload = typeof payload === 'string' ? ENCODER.encode(payload) : payload + const cac = makeContentAddressedChunk(rawPayload) + + let sigBytes: Uint8Array + + if (this.privateKey) { + const soc = cac.toSingleOwnerChunk(this.socIdentifier, this.privateKey) + sigBytes = soc.signature.toUint8Array() + } else if (this.signFn) { + const sigData = Binary.concatBytes(this.socIdentifier.toUint8Array(), cac.address.toUint8Array()) + const sig = await this.signFn(sigData) + sigBytes = sig instanceof Signature ? sig.toUint8Array() : new Signature(sig).toUint8Array() + } else { + throw new Error('Cannot encode messages in subscriber-only mode (no signer available)') + } + + return Binary.concatBytes(sigBytes, cac.span.toUint8Array(), cac.payload.toUint8Array()) + } + + decodeMessage(frame: Uint8Array): Bytes { + return new Bytes(frame.slice(SIG_SIZE + SPAN_WS_SIZE)) + } +} + +export type PubsubModeParams = { + [PubsubMode.GSOC_EPHEMERAL]: ConstructorParameters +} + +export type PubsubModeInstance = { + [PubsubMode.GSOC_EPHEMERAL]: GsocEphemeralMode & IPubsubMode +} + +export function createPubsubMode( + mode: M, + ...params: PubsubModeParams[M] +): PubsubModeInstance[M] & IPubsubMode { + switch (mode) { + case PubsubMode.GSOC_EPHEMERAL: { + const [modeParams] = params as PubsubModeParams[PubsubMode.GSOC_EPHEMERAL] + + return new GsocEphemeralMode(modeParams) as unknown as PubsubModeInstance[M] & IPubsubMode + } + default: + throw new Error(`Unknown pubsub mode: ${mode}`) + } +} + +export function connect( + url: string, + topicAddress: string, + brokerPeer: string, + modeHeaders?: Record, + requestHeaders?: Record, +): WebSocket { + const wsUrl = url.replace(/^http/i, 'ws') + const headers: Record = { + ...requestHeaders, + 'swarm-pubsub-peer': brokerPeer, + ...modeHeaders, + } + + // Browsers cannot set custom headers on WebSocket connections. + // Pass them as query params instead; the server accepts both. + const isBrowser = typeof window !== 'undefined' && typeof window.WebSocket !== 'undefined' + + if (isBrowser) { + const params = new URLSearchParams(headers) + + return new WebSocket(`${wsUrl}/${endpoint}/${topicAddress}?${params.toString()}`) + } + + return new WebSocket(`${wsUrl}/${endpoint}/${topicAddress}`, { headers }) +} + +export async function listTopics(requestOptions: BeeRequestOptions): Promise { + const response = await http(requestOptions, { + method: 'get', + url: `${endpoint}/`, + responseType: 'json', + }) + + const body = Types.asObject(response.data, { name: 'response.data' }) + const topicsRaw = Types.asArray(body.topics, { name: 'topics' }) + + return { + topics: topicsRaw.map((item, i) => { + const t = Types.asObject(item, { name: `topics[${i}]` }) + + return { + topicAddress: Types.asString(t.topicAddress, { name: 'topicAddress' }), + mode: Types.asInteger(t.mode, { name: 'mode' }), + role: Types.asString(t.role, { name: 'role' }) as 'broker' | 'subscriber', + connections: Types.asArray(t.connections, { name: 'connections' }).map((c, j) => + Types.asString(c, { name: `connections[${j}]` }), + ), + } + }), + } +} diff --git a/src/types/index.ts b/src/types/index.ts index b6fd5835..2df3be10 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -13,6 +13,7 @@ import { PrivateKey, PublicKey, Reference, + Signature, Topic, TransactionId, } from '../utils/typed-bytes' @@ -381,6 +382,48 @@ export interface GsocMessageHandler { onClose: (subscription: GsocSubscription) => void } +export enum PubsubMode { + GSOC_EPHEMERAL = 'gsoc-ephemeral', +} + +export type PubsubSignFn = (data: Uint8Array) => Signature | Promise + +/** External wallet signer — can publish. Provide Ethereum address and sign function. */ +type WithExternalSigner = { address: EthAddress; signFn: PubsubSignFn; topic?: never; topicAddress?: never } +/** Ephemeral signer — can publish. `topic` is either a human-readable string (hashed with keccak256 to derive the 32-byte private key) or a 32-byte key directly (hex string or Uint8Array), used as-is. */ +type WithEphemeralSigner = { address?: never; signFn?: never; topic: string | Uint8Array; topicAddress?: never } +/** Subscriber-only — read-only, no signing. Provide the topicAddress (SOC address) directly. */ +type WithSubscriberOnly = { address?: never; signFn?: never; topic?: never; topicAddress: string } + +export type GsocEphemeralParams = { socId?: string | Uint8Array } & ( + | WithExternalSigner + | WithEphemeralSigner + | WithSubscriberOnly +) + +export interface PubsubTopicInfo { + topicAddress: string + mode: number + role: 'broker' | 'subscriber' | 'publisher' + connections: string[] +} + +export interface PubsubTopicListResponse { + topics: PubsubTopicInfo[] +} + +export interface PubsubSubscription { + cancel(): void + send(payload: Uint8Array | string): Promise +} + +export interface PubsubMessageHandler { + onOpen?: (subscription: PubsubSubscription) => void + onMessage: (message: Bytes, subscription: PubsubSubscription) => void + onError: (error: BeeError, subscription: PubsubSubscription) => void + onClose: (subscription: PubsubSubscription) => void +} + export interface ReferenceResponse { reference: Reference } diff --git a/src/utils/data.browser.ts b/src/utils/data.browser.ts index ab631787..023b7048 100644 --- a/src/utils/data.browser.ts +++ b/src/utils/data.browser.ts @@ -1,4 +1,6 @@ -export async function prepareWebsocketData(data: string | ArrayBuffer | Blob): Promise | never { +import type { Data } from 'ws' + +export async function prepareWebsocketData(data: Data | string | ArrayBuffer | Blob): Promise | never { if (typeof data === 'string') { return new TextEncoder().encode(data) } diff --git a/src/utils/data.ts b/src/utils/data.ts index 6d018268..acf5485c 100644 --- a/src/utils/data.ts +++ b/src/utils/data.ts @@ -4,7 +4,7 @@ function isBufferArray(buffer: unknown): buffer is Buffer[] { return Array.isArray(buffer) && buffer.length > 0 && buffer.every(data => data instanceof Buffer) } -export async function prepareWebsocketData(data: Data | Blob): Promise | never { +export async function prepareWebsocketData(data: Data | Blob): Promise { if (typeof data === 'string') { return new TextEncoder().encode(data) } diff --git a/test/unit/pubsub.spec.ts b/test/unit/pubsub.spec.ts new file mode 100644 index 00000000..55c1f3cc --- /dev/null +++ b/test/unit/pubsub.spec.ts @@ -0,0 +1,141 @@ +import { Binary } from 'cafe-utility' +import { EthAddress, GsocEphemeralMode, NULL_IDENTIFIER, PrivateKey, PubsubMode, Signature, createPubsubMode } from '../../src' + +const TOPIC = 'test-topic' +const PAYLOAD_TEXT = 'hello there!' +const PAYLOAD_BYTES = new TextEncoder().encode(PAYLOAD_TEXT) + +const SIG_SIZE = 65 +const SPAN_WS_SIZE = 8 + +test('GsocEphemeralMode - encodeMessage / decodeMessage round-trip (string payload)', async () => { + const mode = new GsocEphemeralMode({ topic: TOPIC }) + const frame = await mode.encodeMessage(PAYLOAD_TEXT) + const decoded = mode.decodeMessage(frame) + expect(decoded.toUtf8()).toBe(PAYLOAD_TEXT) +}) + +test('GsocEphemeralMode - encodeMessage / decodeMessage round-trip (Uint8Array payload)', async () => { + const mode = new GsocEphemeralMode({ topic: TOPIC }) + const frame = await mode.encodeMessage(PAYLOAD_BYTES) + const decoded = mode.decodeMessage(frame) + expect(decoded.toUint8Array()).toEqual(PAYLOAD_BYTES) +}) + +test('GsocEphemeralMode - frame structure: [sig:65B][span:8B][payload]', async () => { + const mode = new GsocEphemeralMode({ topic: TOPIC }) + const frame = await mode.encodeMessage(PAYLOAD_TEXT) + + expect(frame.length).toBe(SIG_SIZE + SPAN_WS_SIZE + PAYLOAD_BYTES.length) + + const spanView = new DataView(frame.buffer, frame.byteOffset + SIG_SIZE, SPAN_WS_SIZE) + const span = spanView.getBigUint64(0, true) + expect(span).toBe(BigInt(PAYLOAD_BYTES.length)) +}) + +test('GsocEphemeralMode - deterministic headers for same topic', () => { + const mode1 = new GsocEphemeralMode({ topic: TOPIC }) + const mode2 = new GsocEphemeralMode({ topic: TOPIC }) + const headers1 = mode1.getPublisherHeaders()! + const headers2 = mode2.getPublisherHeaders()! + + expect(headers1['swarm-pubsub-gsoc-eth-address']).toBe(headers2['swarm-pubsub-gsoc-eth-address']) + expect(headers1['swarm-pubsub-gsoc-topic']).toBe(headers2['swarm-pubsub-gsoc-topic']) +}) + +test('GsocEphemeralMode - default socId is NULL_IDENTIFIER', () => { + const mode = new GsocEphemeralMode({ topic: TOPIC }) + const headers = mode.getPublisherHeaders()! + const nullIdHex = Binary.uint8ArrayToHex(NULL_IDENTIFIER) + expect(headers['swarm-pubsub-gsoc-topic']).toBe(nullIdHex) +}) + +test('GsocEphemeralMode - custom socId as string (keccak256 hashed)', () => { + const mode = new GsocEphemeralMode({ topic: TOPIC, socId: 'my-id' }) + const headers = mode.getPublisherHeaders()! + const nullIdHex = Binary.uint8ArrayToHex(NULL_IDENTIFIER) + expect(headers['swarm-pubsub-gsoc-topic']).not.toBe(nullIdHex) +}) + +test('GsocEphemeralMode - custom socId as Uint8Array', () => { + const customId = new Uint8Array(32).fill(0xab) + const mode = new GsocEphemeralMode({ topic: TOPIC, socId: customId }) + const headers = mode.getPublisherHeaders()! + expect(headers['swarm-pubsub-gsoc-topic']).toBe(Binary.uint8ArrayToHex(customId)) +}) + +test('GsocEphemeralMode - different topics produce different headers', () => { + const mode1 = new GsocEphemeralMode({ topic: 'topic-a' }) + const mode2 = new GsocEphemeralMode({ topic: 'topic-b' }) + + expect(mode1.getPublisherHeaders()!['swarm-pubsub-gsoc-eth-address']).not.toBe( + mode2.getPublisherHeaders()!['swarm-pubsub-gsoc-eth-address'], + ) +}) + +test('GsocEphemeralMode - external signer: mock signFn is called and frame is built', async () => { + const privateKey = new PrivateKey(Binary.keccak256(new TextEncoder().encode('external-key'))) + const mockSig = privateKey.sign(new Uint8Array(32)) + + const signFn = jest.fn().mockResolvedValue(mockSig) + const address = privateKey.publicKey().address() + + const mode = new GsocEphemeralMode({ address, signFn }) + const frame = await mode.encodeMessage(PAYLOAD_TEXT) + + expect(signFn).toHaveBeenCalledTimes(1) + expect(frame.length).toBe(SIG_SIZE + SPAN_WS_SIZE + PAYLOAD_BYTES.length) + expect(frame.slice(0, SIG_SIZE)).toEqual(mockSig.toUint8Array()) +}) + +test('GsocEphemeralMode - external signer: address header matches provided address', () => { + const privateKey = new PrivateKey(Binary.keccak256(new TextEncoder().encode('external-key'))) + const address = privateKey.publicKey().address() + const signFn = jest.fn() + + const mode = new GsocEphemeralMode({ address, signFn }) + const headers = mode.getPublisherHeaders()! + + expect(headers['swarm-pubsub-gsoc-eth-address']).toBe(Binary.uint8ArrayToHex(new EthAddress(address).toUint8Array())) +}) + +test('createPubsubMode factory - GSOC_EPHEMERAL creates GsocEphemeralMode', () => { + const mode = createPubsubMode(PubsubMode.GSOC_EPHEMERAL, { topic: TOPIC }) + expect(mode).toBeInstanceOf(GsocEphemeralMode) + expect(mode.topicAddress).toMatch(/^[0-9a-fA-F]{64}$/) +}) + +test('createPubsubMode factory - GSOC_EPHEMERAL with params', () => { + const mode = createPubsubMode(PubsubMode.GSOC_EPHEMERAL, { topic: TOPIC, socId: 'my-id' }) + expect(mode).toBeInstanceOf(GsocEphemeralMode) + const nullIdHex = Binary.uint8ArrayToHex(NULL_IDENTIFIER) + expect(mode.getPublisherHeaders()!['swarm-pubsub-gsoc-topic']).not.toBe(nullIdHex) +}) + +test('GsocEphemeralMode - subscriber-only: topicAddress provided directly, no headers', () => { + const topicAddr = 'ab'.repeat(32) + const mode = new GsocEphemeralMode({ topicAddress: topicAddr }) + expect(mode.topicAddress).toBe(topicAddr) + expect(mode.getPublisherHeaders()).toBeNull() +}) + +test('GsocEphemeralMode - subscriber-only: encodeMessage throws', async () => { + const mode = new GsocEphemeralMode({ topicAddress: 'ab'.repeat(32) }) + await expect(mode.encodeMessage('hello')).rejects.toThrow('subscriber-only') +}) + +test('GsocEphemeralMode - ephemeral with Uint8Array topic (used as private key directly)', () => { + const keyBytes = Binary.keccak256(new TextEncoder().encode('raw-key')) + const mode = new GsocEphemeralMode({ topic: keyBytes }) + expect(mode.topicAddress).toMatch(/^[0-9a-fA-F]{64}$/) + expect(mode.getPublisherHeaders()).not.toBeNull() +}) + +test('Signature - correctly validated on encoded message (ephemeral path)', async () => { + const mode = new GsocEphemeralMode({ topic: TOPIC }) + const frame = await mode.encodeMessage(PAYLOAD_TEXT) + const sigBytes = frame.slice(0, SIG_SIZE) + expect(sigBytes.length).toBe(SIG_SIZE) + const sig = new Signature(sigBytes) + expect(sig).toBeInstanceOf(Signature) +})