Conversation
Replaces manual inc_requests/dec_requests calls with a RequestPermit type that atomically reserves a slot at peer selection time and releases it via Drop. Closes a class of leaks that used to happen when spawned workers panicked between inc and dec. - get_best_peer, get_best_peer_excluding, get_random_peer return Option<(H256, PeerConnection, RequestPermit)> and bump the selected peer's requests counter under &mut self before returning. - Permit's Drop fires fire-and-forget dec_requests. - inc_requests removed from the public protocol trait. - PeerHandler::make_request helper deleted; callers use connection.outgoing_request directly with the permit bound as _permit. - Four spawn+channel flows (account range, bytecodes, storage range, header download) route the permit through the completion channel; permit drops on receive or with the spawned task on cancellation. - request_state_trienodes, request_storage_trienodes take RequestPermit instead of PeerTable; unused peer_id parameter dropped. - update_pivot destructures the permit so it lives through the retry loop. ask_peer_head_number (head-probe path via get_peer_connections) is intentionally untracked here; the follow-up get_best_n_peers commit restores tracking.
Lines of code reportTotal lines added: Detailed view |
🤖 Kimi Code ReviewThis PR introduces a Approval with minor suggestions:
Correctness verification:
No security vulnerabilities or consensus-critical bugs detected. The refactor reduces the surface area for request-count desynchronization. Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt |
🤖 Claude Code ReviewI now have enough context for a thorough review. Here is the final output: Code Review: PR #6523 —
|
🤖 Codex Code ReviewFindings
No EVM/gas/EIP logic is touched here; the review risk is in sync scheduling and peer-accounting correctness. I couldn’t run Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
Greptile SummaryThis PR replaces the manual Confidence Score: 5/5Safe to merge; all remaining findings are P2 style/clarity items with no correctness impact. The core permit mechanism is sound — atomic increment inside the actor handler, RAII decrement on drop, No files require special attention;
|
| Filename | Overview |
|---|---|
| crates/networking/p2p/peer_table.rs | Introduces RequestPermit RAII guard; atomically increments requests inside selection handlers and decrements on drop; removes public inc_requests; adds get_best_n_peers and read-only has_eligible_peer. |
| crates/networking/p2p/peer_handler.rs | Removes make_request wrapper and all manual inc_requests/dec_requests calls; passes RequestPermit through functions; dead _peer_id parameter lingers in get_block_header public API. |
| crates/networking/p2p/snap/client.rs | Snap workers now accept permit: RequestPermit, drop it immediately after outgoing_request returns, and proceed with pure computation; removed Clone from StorageTaskResult; cleaned up worker error paths. |
| crates/networking/p2p/sync/healing/state.rs | Threads permit through to request_state_trienodes; removes peer_table clone; straightforward adaptation. |
| crates/networking/p2p/sync/healing/storage.rs | Threads permit through to request_storage_trienodes; removes peer_table clone; straightforward adaptation. |
| crates/networking/p2p/sync/snap_sync.rs | Adopts has_eligible_peer probe and consumes permit in get_block_header; removes MAX_RETRIES_PER_PEER inner loop — one attempt per peer per rotation now, which is a behaviour change from the previous 3-retry policy. |
Sequence Diagram
sequenceDiagram
participant C as Caller
participant PT as PeerTableServer
participant W as Worker Task
participant P as Peer (network)
C->>PT: get_best_peer(caps)
Note over PT: peer.requests += 1 atomically
PT-->>C: (peer_id, connection, RequestPermit)
C->>W: tokio::spawn(worker(connection, permit, ...))
Note over C: permit moved into worker
W->>P: connection.outgoing_request(...)
P-->>W: response
W->>W: drop(permit)
Note over W: Drop impl fires dec_requests(peer_id)
W->>PT: dec_requests (fire-and-forget cast)
Note over PT: peer.requests -= 1
W->>C: tx.send(result)
Comments Outside Diff (1)
-
crates/networking/p2p/peer_table.rs, line 502-530 (link)do_get_best_n_peerssnapshotsrequestsbefore incrementsdo_get_best_n_peerstakes&selfand snapshots each peer'srequestsvalue at filter/sort time.handle_get_best_n_peersthen immediately incrementsrequestsfor every returned peer (up tontimes). Because the entire handler runs under&mut selfin a single actor tick there is no race condition, but the sort order used to pick the top-n candidates is based on pre-increment weights. In practice this is benign, but a brief comment that the sort is intentionally a pre-increment snapshot would prevent future confusion.Prompt To Fix With AI
This is a comment left during a code review. Path: crates/networking/p2p/peer_table.rs Line: 502-530 Comment: **`do_get_best_n_peers` snapshots `requests` before increments** `do_get_best_n_peers` takes `&self` and snapshots each peer's `requests` value at filter/sort time. `handle_get_best_n_peers` then immediately increments `requests` for every returned peer (up to `n` times). Because the entire handler runs under `&mut self` in a single actor tick there is no race condition, but the sort order used to pick the top-n candidates is based on pre-increment weights. In practice this is benign, but a brief comment that the sort is intentionally a pre-increment snapshot would prevent future confusion. How can I resolve this? If you propose a fix, please make it concise.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/networking/p2p/peer_handler.rs
Line: 586-591
Comment:
**Unused `_peer_id` parameter in public API**
`_peer_id: H256` is accepted by `get_block_header` but never used inside the function — it was only consumed by the removed `make_request` call. All call sites still pass `peer_id` explicitly (e.g. in `snap_sync.rs`). The parameter can be dropped from the signature; callers already hold `peer_id` for the `record_success`/`record_failure` calls they make after the function returns.
```suggestion
pub async fn get_block_header(
&mut self,
connection: &mut PeerConnection,
_permit: RequestPermit,
block_number: u64,
) -> Result<Option<BlockHeader>, PeerHandlerError> {
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: crates/networking/p2p/peer_table.rs
Line: 502-530
Comment:
**`do_get_best_n_peers` snapshots `requests` before increments**
`do_get_best_n_peers` takes `&self` and snapshots each peer's `requests` value at filter/sort time. `handle_get_best_n_peers` then immediately increments `requests` for every returned peer (up to `n` times). Because the entire handler runs under `&mut self` in a single actor tick there is no race condition, but the sort order used to pick the top-n candidates is based on pre-increment weights. In practice this is benign, but a brief comment that the sort is intentionally a pre-increment snapshot would prevent future confusion.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 789-800
Comment:
**Retry count regression: 3 retries per peer removed**
`MAX_RETRIES_PER_PEER` (was 3) has been removed. Now each peer in a rotation gets exactly one attempt, and a failure immediately pushes it onto `excluded_peers`. With `MAX_ROTATIONS = 5` and exponential backoff between rotations, transient network hiccups that previously resolved within 3 tries will now consume an entire rotation and incur a full backoff delay before the same peer is tried again. If "one shot per rotation" is intentional, a short inline comment confirming this would be helpful.
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "refactor(p2p): drop permit tests, apply ..." | Re-trigger Greptile
| Ok(self.peer_table.peer_count().await?) | ||
| } | ||
|
|
||
| /// Requests a single block header by number from an already-selected peer. | ||
| /// Consumes a `RequestPermit` reserved by the caller at peer selection | ||
| /// time; the permit drops when this function returns, releasing the slot. |
There was a problem hiding this comment.
Unused
_peer_id parameter in public API
_peer_id: H256 is accepted by get_block_header but never used inside the function — it was only consumed by the removed make_request call. All call sites still pass peer_id explicitly (e.g. in snap_sync.rs). The parameter can be dropped from the signature; callers already hold peer_id for the record_success/record_failure calls they make after the function returns.
| Ok(self.peer_table.peer_count().await?) | |
| } | |
| /// Requests a single block header by number from an already-selected peer. | |
| /// Consumes a `RequestPermit` reserved by the caller at peer selection | |
| /// time; the permit drops when this function returns, releasing the slot. | |
| pub async fn get_block_header( | |
| &mut self, | |
| connection: &mut PeerConnection, | |
| _permit: RequestPermit, | |
| block_number: u64, | |
| ) -> Result<Option<BlockHeader>, PeerHandlerError> { |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/peer_handler.rs
Line: 586-591
Comment:
**Unused `_peer_id` parameter in public API**
`_peer_id: H256` is accepted by `get_block_header` but never used inside the function — it was only consumed by the removed `make_request` call. All call sites still pass `peer_id` explicitly (e.g. in `snap_sync.rs`). The parameter can be dropped from the signature; callers already hold `peer_id` for the `record_success`/`record_failure` calls they make after the function returns.
```suggestion
pub async fn get_block_header(
&mut self,
connection: &mut PeerConnection,
_permit: RequestPermit,
block_number: u64,
) -> Result<Option<BlockHeader>, PeerHandlerError> {
```
How can I resolve this? If you propose a fix, please make it concise.| "Trying to update pivot to {new_pivot_block_number} with peer {peer_id} (score: {peer_score})" | ||
| ); | ||
|
|
||
| // Try up to MAX_RETRIES_PER_PEER times with this specific peer. | ||
| // Both Ok(None) and recoverable errors count as a failure and advance | ||
| // through retries; on exhaustion, the peer is excluded and we rotate. | ||
| let mut peer_failures: u64 = 0; | ||
| for attempt in 0..MAX_RETRIES_PER_PEER { | ||
| let outcome = peers | ||
| .get_block_header(peer_id, &mut connection, new_pivot_block_number) | ||
| .await; | ||
|
|
||
| match outcome { | ||
| Ok(Some(pivot)) => { | ||
| // Success — reward peer and return | ||
| peers.peer_table.record_success(peer_id)?; | ||
| #[cfg(feature = "metrics")] | ||
| ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success"); | ||
| info!("Successfully updated pivot"); | ||
|
|
||
| { | ||
| let mut diag = diagnostics.write().await; | ||
| diag.push_pivot_change(super::PivotChangeEvent { | ||
| timestamp: current_unix_time(), | ||
| old_pivot_number: block_number, | ||
| new_pivot_number: pivot.number, | ||
| outcome: "success".to_string(), | ||
| failure_reason: None, | ||
| }); | ||
| diag.pivot_block_number = Some(pivot.number); | ||
| diag.pivot_timestamp = Some(pivot.timestamp); | ||
| let pivot_age = current_unix_time().saturating_sub(pivot.timestamp); | ||
| diag.pivot_age_seconds = Some(pivot_age); | ||
| METRICS | ||
| .pivot_timestamp | ||
| .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed); | ||
| } | ||
| let block_headers = peers | ||
| .request_block_headers(block_number + 1, pivot.hash()) | ||
| .await? | ||
| .ok_or(SyncError::NoBlockHeaders)?; | ||
| block_sync_state | ||
| .process_incoming_headers(block_headers.into_iter()) | ||
| .await?; | ||
| *METRICS.sync_head_hash.lock().await = pivot.hash(); | ||
| return Ok(pivot); | ||
| } | ||
| Ok(None) => { | ||
| peers.peer_table.record_failure(peer_id)?; | ||
| peer_failures += 1; | ||
| let peer_score = peers.peer_table.get_score(peer_id).await?; | ||
| warn!( | ||
| "update_pivot: peer {peer_id} returned None (attempt {}/{MAX_RETRIES_PER_PEER}, score: {peer_score})", | ||
| attempt + 1 | ||
| ); | ||
| #[cfg(feature = "metrics")] | ||
| ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none"); | ||
| } | ||
| Err(e) if e.is_recoverable() => { | ||
| peers.peer_table.record_failure(peer_id)?; | ||
| peer_failures += 1; | ||
| warn!( | ||
| "update_pivot: peer {peer_id} failed with {e} (attempt {}/{MAX_RETRIES_PER_PEER})", | ||
| attempt + 1 | ||
| ); | ||
| #[cfg(feature = "metrics")] | ||
| ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error"); | ||
| } | ||
| Err(e) => { | ||
| // Non-recoverable error (e.g., dead peer table actor, | ||
| // storage full) — surface it. | ||
| return Err(SyncError::PeerHandler(e)); | ||
| // One attempt per peer per rotation. A peer that fails is excluded for | ||
| // this rotation and will be retried (with backoff) in the next one. | ||
| let outcome = peers | ||
| .get_block_header(peer_id, &mut connection, permit, new_pivot_block_number) | ||
| .await; | ||
|
|
||
| match outcome { | ||
| Ok(Some(pivot)) => { | ||
| peers.peer_table.record_success(peer_id)?; |
There was a problem hiding this comment.
Retry count regression: 3 retries per peer removed
MAX_RETRIES_PER_PEER (was 3) has been removed. Now each peer in a rotation gets exactly one attempt, and a failure immediately pushes it onto excluded_peers. With MAX_ROTATIONS = 5 and exponential backoff between rotations, transient network hiccups that previously resolved within 3 tries will now consume an entire rotation and incur a full backoff delay before the same peer is tried again. If "one shot per rotation" is intentional, a short inline comment confirming this would be helpful.
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 789-800
Comment:
**Retry count regression: 3 retries per peer removed**
`MAX_RETRIES_PER_PEER` (was 3) has been removed. Now each peer in a rotation gets exactly one attempt, and a failure immediately pushes it onto `excluded_peers`. With `MAX_ROTATIONS = 5` and exponential backoff between rotations, transient network hiccups that previously resolved within 3 tries will now consume an entire rotation and incur a full backoff delay before the same peer is tried again. If "one shot per rotation" is intentional, a short inline comment confirming this would be helpful.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
It's easier to implement, equally correct and simpler this way.
Motivation
Currently we have peer selection functions such as
get_best_peerwhich consider the number of in-flight requests to a given peer to ensure it's capped according to what the peer can handle.However, these functions simply return a peer id, and there is:
To solve this we do an extra inc_requests manually, which is prone to forgetting to do dec_requests and thus leaking peers. This currently happens in several snapsync related functions.
On top of that, the inc_requests/request/dec_requests pattern isn't cancellation-safe.
Description
We solve all these issues at once by having the peer selection functions return a permit to make one request to the peer, similar to what tokio's channels do.
Whenever the permit is dropped (consumed when sending, exiting without using, task is cancelled) the slot is freed. This makes it very hard to unintentionally leak request slots.