Skip to content

feat(dkg): Implement bcast#290

Open
iamquang95 wants to merge 25 commits intomainfrom
iamquang95/dkb/bcast2
Open

feat(dkg): Implement bcast#290
iamquang95 wants to merge 25 commits intomainfrom
iamquang95/dkb/bcast2

Conversation

@iamquang95
Copy link
Copy Markdown
Collaborator

Should fix: #132

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude

@claude
Copy link
Copy Markdown

claude bot commented Mar 23, 2026

Claude finished @iamquang95's task in 3m 3s —— View job


Code Review

  • Gather context and understand the PR
  • Read bcast implementation files
  • Read bootnode example for comparison
  • Post comprehensive review

Summary

This PR implements the DKG reliable-broadcast (bcast) protocol in Rust, porting charon/dkg/bcast. The overall structure — Behaviour, Handler, Component, protocol helpers — is clean and well-layered. The integration test in behaviour.rs covers the core round-trip and duplicate semantics. The example uses relay-based connectivity matching the bootnode example's shape (addressing @varex83's review comment).

A few issues are noted below.


Findings

[Medium] TODO comment must be removed before merge

  • crates/dkg/src/bcast/protocol.rs:20
  • Per AGENTS.md: "Avoid leaving TODOs in merged code." The comment // TODO: It's duplicated with peerinfo/src/protocol.rs:write_protobuf. must be resolved or removed. If deduplication is tracked separately, move it to an issue.
  • Fix this →

[Medium] next_op_id overflow resets to 0 — potential id collision

  • crates/dkg/src/bcast/behaviour.rs:88
  • self.next_op_id.checked_add(1).unwrap_or(0) resets next_op_id to 0 on overflow. If there happens to be an active op with id 0, this causes a collision. With u64::MAX possible values this is astronomically unlikely in practice, but semantically wrapping_add(1) is more correct — it cycles without resetting to a previous value. The current code also uses unwrap_or which is disallowed in production code by workspace lints (the unwrap_used = "deny" lint). This will fail clippy.
  • Fix this →

[Medium] .expect() in merge_known_peers panics on invalid peer ID

  • crates/dkg/examples/bcast.rs:273
  • PeerId::from_str(peer).expect("Failed to parse peer ID") — this is inside a plain fn (not main), called from a non-example path. While .expect() is tolerated in example main() functions, this helper is reusable and should return a Result or at minimum use anyhow::bail!. Panics inside merge_known_peers will not be caught gracefully.
  • Fix this →

[Low] handle_inbound_sig_request runs check before dedup recording

  • crates/dkg/src/bcast/handler.rs:386-402
  • The handler.check(peer_id, &any) call happens before the dedup store is consulted/updated. A peer that repeatedly fails the check can send arbitrary hashes without ever being deduplicated. The dedup protection only engages after a check passes. This is likely a parity gap with the Go implementation — worth verifying against charon/dkg/bcast. If the Go code performs dedup first, the order should be swapped.

[Low] Reviewer comment already addressed

  • @varex83's comment asked to use bootnode as a starting point and connect via relays. The current bcast.rs example already does this: it uses bootnode::new_relays, MutableRelayReservation, RelayRouter, and Node::new with QUIC transport — matching the bootnode example's interface. No action needed here.

[Low] ExampleBehaviour omits identify and ping fields

  • crates/dkg/examples/bcast.rs:96-101
  • The bootnode example includes identify and ping in its custom ExampleBehaviour. The bcast example handles PlutoBehaviourEvent::Ping and PlutoBehaviourEvent::Identify in the event loop (lines 600–611), but these events come from Node's built-in Pluto behaviour — not from a custom behaviour field. This is functionally correct. However, if a future ExampleBehaviour variant is added, the handler arms need to stay in sync. Minor consistency gap with bootnode example.

Parity Matrix

Component Go Rust Notes
Protocol IDs /charon/dkg/bcast/1.0.0/{sig,msg} present present (mod.rs:29-32) match
Timeouts (60s receive, 62s send) unverified (no Go ref locally) present (mod.rs:35-38) Go ref needed
Dedup by (peer, msg_id) unverified present (handler.rs:43-48) Go ref needed
Signature = SHA256(type_url || value) unverified present (protocol.rs:61-66) has test vector
Sequential broadcast (one at a time) unverified present (behaviour.rs:58) Go ref needed
Fail on disconnect during active broadcast unverified present (behaviour.rs:104-112) Go ref needed

Note: The Go reference checkout (charon at v1.7.1) is not present in the working directory. Parity items marked "unverified" should be confirmed once the Go reference is available per AGENTS.md requirements.


Tests

  • Unit tests: protocol.rs has hash_any_matches_go_expected_value (cross-language vector) and signature rejection tests. ✅
  • Integration test: behaviour.rs::broadcast_round_trip_and_duplicate_semantics covers the full sig-collect → broadcast flow, dedup semantics, and error paths. ✅
  • No Go reference was available locally to run cross-language parity checks.

Open Questions

  1. Should write_protobuf be extracted to a shared utility crate (e.g., pluto-p2p) to eliminate the duplication with peerinfo?
  2. Does the Go implementation perform the dedup check before or after the application-level check() callback?
  3. Are the 60s/62s timeout values confirmed from the Go source?

@iamquang95 iamquang95 force-pushed the iamquang95/dkb/bcast2 branch from 5b783a5 to 720c50b Compare March 24, 2026 03:57
@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude use git to fetch the Go reference checkout (charon at v1.7.1) in the working directory, then review the PR again

@claude
Copy link
Copy Markdown

claude bot commented Mar 24, 2026

Claude finished @iamquang95's task in 4m 19s —— View job


Code Review (with Go Reference attempt)

  • Understand request
  • Attempt Go reference fetch (blocked — network access not available in this environment)
  • Deep-read all Rust implementation files
  • Post updated review

Go reference status: git clone and all network tools (WebFetch, curl, gh api) require explicit approval and were blocked in this run. The review below is based on a line-by-line read of the Rust source. Parity items that require the Go source for confirmation are marked accordingly.


Previous Findings — Resolution Status

Finding Status
TODO comment at protocol.rs:20 Resolved — extracted to crates/p2p/src/protobuf.rs; peerinfo now uses the shared helper too
next_op_id.checked_add(1).unwrap_or(0) Resolved — now uses wrapping_add(1) (behaviour.rs:88)
.expect() in merge_known_peers Resolved — function now returns Result<Vec<PeerId>> and uses with_context(...)? (examples/bcast.rs:268-284)
Dedup check order Still present — see finding below

Findings

[Medium] Inbound sig-response write uses RECEIVE_TIMEOUT instead of SEND_TIMEOUT

  • crates/dkg/src/bcast/handler.rs:413
  • The signature response is written back to the client using RECEIVE_TIMEOUT (60 s) instead of SEND_TIMEOUT (62 s). All outbound operations in on_connection_event use SEND_TIMEOUT for consistency, but this inbound-side write uses the wrong constant. This causes the write to time out 2 seconds earlier than the outbound send path on the other side, and the constant name is semantically misleading (writing is a send, not a receive).
  • Fix this →

[Low] Dedup runs after check() in handle_inbound_sig_request

  • crates/dkg/src/bcast/handler.rs:381-404
  • The current order is: read → lookup handler → check() → hash → dedup. A peer that repeatedly submits a request where check() fails will never be recorded in the dedup store. In practice, a failing check request is still rejected, so this is not a correctness issue — but it means a misbehaving peer can repeatedly submit different payloads for the same (peer, msg_id) without ever triggering DuplicateMismatchingHash. The dedup guard only fires on the second accepted request. Without the Go reference, I can't confirm whether Go performs dedup before or after the application check — but it is worth verifying.

[Low] handle_inbound_broadcast has no dedup guard

  • crates/dkg/src/bcast/handler.rs:424-462
  • Inbound broadcast messages (/msg subprotocol) verify signatures but do not record a dedup key. If a broadcaster retransmits the same fully-signed BCastMessage (e.g., retry logic), the callback() is invoked multiple times on the receiver. The dedup store is threaded into the handler but not used in this path. This may be intentional if callers are required to be idempotent, but it should be confirmed against the Go implementation.

[Low] Broadcaster does not invoke callback on itself

  • crates/dkg/src/bcast/behaviour.rs / crates/dkg/src/bcast/handler.rs
  • The broadcast originator signs the message, collects all peer signatures, verifies them, and fans out the BCastMessage to all other peers. But handler.callback() is only triggered by handle_inbound_broadcast, which only runs on inbound streams. The originating node never receives its own broadcast, so the callback is never invoked locally. This is plausibly correct (the originator already has the message), but differs from a subscriber model where all nodes including the originator receive the finalized broadcast. Worth confirming with Go behavior.

Parity Matrix

Component Rust Go ref needed Notes
Protocol IDs /charon/dkg/bcast/1.0.0/{sig,msg} mod.rs:29-32 No Hardcoded strings match expected format
RECEIVE_TIMEOUT = 60s, SEND_TIMEOUT = 62s mod.rs:35-38 Yes Values need confirmation from Go
Dedup by (peer_id, msg_id) handler.rs:43-48 Yes Only on sig path, not bcast path
hash_any = SHA256(type_url ‖ value) protocol.rs:16-21 No Cross-lang test vector present
sign_any / verify_signatures protocol.rs:24-53 Yes Uses secp256k1 via pluto_k1util
Sequential broadcast (one at a time) behaviour.rs:58 Yes Option<BroadcastState>
Fail-on-disconnect during active broadcast behaviour.rs:104-113 Yes Checked on ConnectionClosed
Only last-connection-closed triggers failure behaviour.rs:380-387 Yes Uses HashSet<ConnectionId> per peer
Single-node cluster completes immediately behaviour.rs:262-265 Yes dispatches.is_empty() path
Shared write/read_protobuf helper p2p/src/protobuf.rs N/A Extracted from peerinfo, now shared
Response timeout on sig write ⚠️ handler.rs:413 uses RECEIVE_TIMEOUT Yes Should be SEND_TIMEOUT — see finding above
Broadcaster invokes own callback ❌ Not implemented Yes May be intentional
Broadcast dedup ❌ Missing on /msg path Yes May be intentional

Tests

  • protocol.rs::hash_any_matches_go_expected_value — cross-language test vector ✅
  • protocol.rs::verify_signatures_rejects_invalid_count_length_and_order
  • component.rs::duplicate_message_id_registration_fails
  • behaviour.rs::broadcast_round_trip_and_duplicate_semantics — covers full round-trip, dedup mismatch, and unknown message ID paths ✅
  • No tests for single-node cluster, broadcaster self-callback, or inbound broadcast dedup

Open Questions (for Go reference verification)

  1. Does Go's handle_inbound_sig_request run the application check before or after dedup?
  2. Does the Go bcast /msg handler have a dedup guard?
  3. Does Go invoke the local callback on the broadcasting node?
  4. Are the 60s/62s timeout constants confirmed from Go source?
  5. Does Go write the sig response with the "send" timeout or "receive" timeout?

@iamquang95
Copy link
Copy Markdown
Collaborator Author

[Medium] Inbound sig-response write uses RECEIVE_TIMEOUT instead of SEND_TIMEOUT

[Low] Dedup runs after check() in handle_inbound_sig_request

[Low] Broadcaster does not invoke callback on itself

This matches the Charon implementation

[Low] handle_inbound_broadcast has no dedup guard

Charon doesn't dedup /msg

Are the 60s/62s timeout constants confirmed from Go source?

Yes, they are using 1 Minute, 1 Minute + 2 Second

@varex83
Copy link
Copy Markdown
Collaborator

varex83 commented Mar 24, 2026

Should we convert it to "ready for review"?

@iamquang95 iamquang95 marked this pull request as ready for review March 24, 2026 11:01
return Poll::Ready(event);
}

while let Poll::Ready(Some(command)) = self.command_rx.poll_recv(cx) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while let Poll::Ready(Some(command)) = self.command_rx.poll_recv(cx) {
if let Poll::Ready(Some(command)) = self.command_rx.poll_recv(cx) {

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be more than 1 command, using while is more efficient without waiting for the next poll

@iamquang95
Copy link
Copy Markdown
Collaborator Author

@claude

@claude
Copy link
Copy Markdown

claude bot commented Mar 30, 2026

Claude finished @iamquang95's task in 3m 0s —— View job


Code Review (Updated — 2026-03-30)

  • Read current state of all bcast files
  • Check varex83's outstanding review comments
  • Verify previous findings are resolved
  • Post comprehensive review

Status of Previous Findings

Finding Status
TODO comment at protocol.rs ✅ Resolved — extracted to p2p/src/protobuf.rs
wrapping_add(1) for next_op_id ✅ Resolved
merge_known_peers returns Result ✅ Resolved
RECEIVE_TIMEOUT on sig response write ✅ Confirmed intentional (matches Charon)
Dedup runs after check() ✅ Confirmed intentional (matches Charon)
No dedup guard on /msg path ✅ Confirmed intentional (matches Charon)
Broadcaster does not invoke own callback ✅ Confirmed intentional (matches Charon)

Status of varex83's 2026-03-27 Review Comments

Comment Status
Dummy handler for relays ✅ Resolved — connection_handler_for_peer() returns dummy::ConnectionHandler for non-peers
local_peer_id to P2PContext ✅ Resolved — p2p_context.local_peer_id() used throughout
Events for debugging ✅ Resolved — Event::BroadcastCompleted and Event::BroadcastFailed
Connection tracking via P2P context ✅ Resolved — p2p_context.peer_store_lock()
NotifyHandler::Any ✅ Resolved — used at behaviour.rs:229,311
Error as enum ✅ Resolved — Error and Failure enums in error.rs
while let poll_recv (behaviour.rs:438) ✅ Resolved
while let active_futures (handler.rs:314) ✅ Resolved
Split poll into separate functions (handler.rs:323) ✅ Resolved — on_connection_event delegates to handle_fully_negotiated_inbound, handle_fully_negotiated_outbound, handle_dial_upgrade_error
pub mod handler in mod.rs (marked "Unused") ⚠️ Open — see below
Component could live in behaviour impl ⚠️ Open — agreed to try
broadcast() async/event-driven ⚠️ Partial — see below

Remaining Findings

[Low] pub mod handler exposes nothing — should be pub(crate)

  • crates/dkg/src/bcast/mod.rs:8
  • pub mod handler; is visible externally, but every item inside handler (Handler, DedupStore, InEvent, OutEvent, PendingOpen) is pub(crate). Nothing outside the crate can reach these types, making the pub visibility dead. This is what varex83 flagged as "Unused". Change to pub(crate) mod handler (or mod handler if it is only needed within the module).
  • Fix this →

[Open] broadcast() is now fire-and-forget — result tracking is caller's responsibility

  • crates/dkg/src/bcast/component.rs:102-119
  • The current broadcast() sends a BroadcastCommand and returns Ok(()) immediately — the result arrives later via Event::BroadcastCompleted / Event::BroadcastFailed. This matches the event-driven design agreed upon. However, since broadcast() no longer surfaces the broadcast result, callers must now drive the swarm event loop and match Event::BroadcastFailed to detect failures. The doc comment on broadcast() should be updated to state this explicitly so callers know the Ok(()) return only means the command was enqueued, not that it succeeded.

[Open] Component as a separate struct vs. merging into Behaviour impl

  • crates/dkg/src/bcast/component.rs
  • varex83's suggestion to move Component into the behaviour impl is still pending. The current split is clean and functional, but the discussion (@iamquang95: "I see, yeah I will try that") was unresolved at the time of this review. If iamquang95 decides to keep the current two-struct design, the rationale (cloneable user-facing handle separate from the swarm-owned behaviour) should be documented on the struct.

Overall Assessment

The implementation is functionally solid and all hard findings from earlier reviews have been resolved. The three items above are low/discussion-level. The two most important things to decide before marking ready-to-review are:

  1. Whether pub mod handler needs to be restricted (easy fix).
  2. Whether the Component/Behaviour split stays or gets merged (design decision between @iamquang95 and @varex83).

Once those are settled, the PR is in good shape to mark ready for review.

@iamquang95
Copy link
Copy Markdown
Collaborator Author

[Open] broadcast() is now fire-and-forget — result tracking is caller's responsibility

This follows rust p2p event-driven design

[Open] Component as a separate struct vs. merging into Behaviour impl

Only new() is moved to Behavior not the Component

Copy link
Copy Markdown
Collaborator

@varex83 varex83 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! LGTM! Really like the implementation, left few comments

Comment on lines +12 to +54
pub enum Failure {
/// The operation timed out.
Timeout,
/// The remote peer does not support the protocol.
Unsupported,
/// The operation failed due to an I/O error.
Io {
/// The underlying error message.
message: String,
},
/// The operation failed for another reason.
Other {
/// The underlying error message.
message: String,
},
}

impl Failure {
/// Creates a new [`Failure::Io`] value.
pub fn io(error: impl fmt::Display) -> Self {
Self::Io {
message: error.to_string(),
}
}

/// Creates a new [`Failure::Other`] value.
pub fn other(error: impl fmt::Display) -> Self {
Self::Other {
message: error.to_string(),
}
}
}

impl fmt::Display for Failure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Timeout => f.write_str("operation timed out"),
Self::Unsupported => f.write_str("protocol negotiation failed"),
Self::Io { message } => write!(f, "i/o error: {message}"),
Self::Other { message } => f.write_str(message),
}
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use thiserror here, it does pretty much the same

Comment on lines +36 to +41
pub fn set_local_peer_id(&self, peer_id: PeerId) {
*self
.local_peer_id
.write()
.expect("Failed to write local peer id") = Some(peer_id);
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can set it once, and make it read-only.


/// Swarm-owned behaviour for reliable broadcast.
pub struct Behaviour {
peers: Arc<Vec<PeerId>>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use known_peers from P2PContext

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement dkg/bcast

2 participants