From 1a955177f9e79ab45884d4e29633197094d63f05 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Thu, 23 Apr 2026 16:05:19 -0300 Subject: [PATCH 1/7] feat(p2p): RAII RequestPermit for peer request slot tracking Replaces manual inc_requests/dec_requests calls with a RequestPermit type that atomically reserves a slot at peer selection time and releases it via Drop. Closes a class of leaks that used to happen when spawned workers panicked between inc and dec. - get_best_peer, get_best_peer_excluding, get_random_peer return Option<(H256, PeerConnection, RequestPermit)> and bump the selected peer's requests counter under &mut self before returning. - Permit's Drop fires fire-and-forget dec_requests. - inc_requests removed from the public protocol trait. - PeerHandler::make_request helper deleted; callers use connection.outgoing_request directly with the permit bound as _permit. - Four spawn+channel flows (account range, bytecodes, storage range, header download) route the permit through the completion channel; permit drops on receive or with the spawned task on cancellation. - request_state_trienodes, request_storage_trienodes take RequestPermit instead of PeerTable; unused peer_id parameter dropped. - update_pivot destructures the permit so it lives through the retry loop. ask_peer_head_number (head-probe path via get_peer_connections) is intentionally untracked here; the follow-up get_best_n_peers commit restores tracking. --- crates/networking/p2p/peer_handler.rs | 133 ++--- crates/networking/p2p/peer_table.rs | 197 ++++++- crates/networking/p2p/snap/client.rs | 547 +++++++++--------- crates/networking/p2p/sync/healing/state.rs | 14 +- crates/networking/p2p/sync/healing/storage.rs | 6 +- crates/networking/p2p/sync/snap_sync.rs | 5 +- 6 files changed, 521 insertions(+), 381 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 61b4349a827..cad43b22eec 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,7 +1,9 @@ use crate::rlpx::initiator::RLPxInitiator; use crate::{ metrics::{CurrentStepValue, METRICS}, - peer_table::{PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _}, + peer_table::{ + PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _, RequestPermit, + }, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, @@ -51,7 +53,6 @@ pub enum BlockRequestOrder { async fn ask_peer_head_number( peer_id: H256, connection: &mut PeerConnection, - peer_table: &PeerTable, sync_head: H256, retries: i32, ) -> Result { @@ -68,7 +69,8 @@ async fn ask_peer_head_number( debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}"); - match PeerHandler::make_request(peer_table, peer_id, connection, request, PEER_REPLY_TIMEOUT) + match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) .await { Ok(RLPxMessage::BlockHeaders(BlockHeaders { @@ -104,27 +106,12 @@ impl PeerHandler { } } - pub(crate) async fn make_request( - // TODO: We should receive the PeerHandler (or self) instead, but since it is not yet spawnified it cannot be shared - // Fix this to avoid passing the PeerTable as a parameter - peer_table: &PeerTable, - peer_id: H256, - connection: &mut PeerConnection, - message: RLPxMessage, - timeout: Duration, - ) -> Result { - peer_table.inc_requests(peer_id)?; - let result = connection.outgoing_request(message, timeout).await; - peer_table.dec_requests(peer_id)?; - result - } - /// Returns a random node id and the channel ends to an active peer connection that supports the given capability /// It doesn't guarantee that the selected peer is not currently busy async fn get_random_peer( &mut self, capabilities: &[Capability], - ) -> Result, PeerHandlerError> { + ) -> Result, PeerHandlerError> { Ok(self .peer_table .get_random_peer(capabilities.to_vec()) @@ -181,15 +168,7 @@ impl PeerHandler { "request_block_headers: resolving sync head with peers" ); for (peer_id, mut connection) in peer_connection.into_iter().take(MAX_PEERS_TO_ASK) { - match ask_peer_head_number( - peer_id, - &mut connection, - &self.peer_table, - sync_head, - retries, - ) - .await - { + match ask_peer_head_number(peer_id, &mut connection, sync_head, retries).await { Ok(number) => { sync_head_number = number; if number != 0 { @@ -249,8 +228,14 @@ impl PeerHandler { let mut downloaded_count = 0_u64; // channel to send the tasks to the peers - let (task_sender, mut task_receiver) = - tokio::sync::mpsc::channel::<(Vec, H256, PeerConnection, u64, u64)>(1000); + let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<( + Vec, + H256, + PeerConnection, + u64, + u64, + RequestPermit, + )>(1000); let mut current_show = 0; @@ -263,11 +248,10 @@ impl PeerHandler { let mut logged_no_free_peers_count = 0; loop { - if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) = + if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit, _permit)) = task_receiver.try_recv() { - // Release the reservation we made before spawning the task. - self.peer_table.dec_requests(peer_id)?; + // _permit drops here, releasing the reservation. trace!("We received a download chunk from peer"); if headers.is_empty() { @@ -316,7 +300,7 @@ impl PeerHandler { self.peer_table.record_success(peer_id)?; debug!("Downloader {peer_id} freed"); } - let Some((peer_id, mut connection)) = self + let Some((peer_id, mut connection, permit)) = self .peer_table .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) .await? @@ -347,23 +331,11 @@ impl PeerHandler { continue; }; let tx = task_sender.clone(); - // Reserve a request slot before spawning so get_best_peer sees - // this peer as busy immediately, preventing the loop from - // spawning dozens of tasks for the same peer in a single tick. - // Reserve a request slot before spawning so get_best_peer sees - // this peer as busy immediately, preventing the loop from - // spawning dozens of tasks for the same peer in a single tick. - // The reservation is released in the completion handler - // (dec_requests on try_recv). The worker calls - // outgoing_request directly (not make_request) since we - // already hold the reservation. - self.peer_table.inc_requests(peer_id)?; + // Selection atomically reserved a request slot; the permit rides + // through the channel and drops in the completion handler (or + // with the task if it's dropped before sending). debug!("Downloader {peer_id} is now busy"); - // Run download_chunk_from_peer in a different Tokio task. - // The worker must always send a result so dec_requests fires - // in the completion handler. The unwrap_or_default() ensures - // download errors don't panic. tokio::spawn(async move { trace!( "Sync Log 5: Requesting block headers from peer {peer_id}, chunk_limit: {chunk_limit}" @@ -378,11 +350,18 @@ impl PeerHandler { .inspect_err(|err| trace!("Sync Log 6: {peer_id} failed to download chunk: {err}")) .unwrap_or_default(); - tx.send((headers, peer_id, connection, startblock, chunk_limit)) - .await - .inspect_err(|err| { - error!("Failed to send headers result through channel. Error: {err}") - }) + tx.send(( + headers, + peer_id, + connection, + startblock, + chunk_limit, + permit, + )) + .await + .inspect_err(|err| { + error!("Failed to send headers result through channel. Error: {err}") + }) }); } @@ -443,18 +422,14 @@ impl PeerHandler { }); match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), - Some((peer_id, mut connection)) => { + Some((peer_id, mut connection, _permit)) => { + // _permit drops at end of this arm, releasing the slot. if let Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, block_headers, - })) = PeerHandler::make_request( - &self.peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + })) = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { if block_headers.is_empty() { // Empty response is valid per eth spec (peer may not have these blocks) @@ -485,9 +460,8 @@ impl PeerHandler { } /// Given a peer id, a chunk start and a chunk limit, requests the block headers from the peer. - /// The caller must already hold a request reservation for this peer - /// (via `inc_requests` before spawning), so we call `outgoing_request` - /// directly instead of `make_request` to avoid a double increment. + /// The caller owns the `RequestPermit` for this peer, so this just + /// forwards to `connection.outgoing_request` — the permit handles lifecycle. async fn download_chunk_from_peer( peer_id: H256, connection: &mut PeerConnection, @@ -537,18 +511,14 @@ impl PeerHandler { }); match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), - Some((peer_id, mut connection)) => { + Some((peer_id, mut connection, _permit)) => { + // _permit drops at end of this arm, releasing the slot. if let Ok(RLPxMessage::BlockBodies(BlockBodies { id: _, block_bodies, - })) = PeerHandler::make_request( - &self.peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + })) = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { // Check that the response is not empty and does not contain more bodies than the ones requested if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len { @@ -625,7 +595,7 @@ impl PeerHandler { pub async fn get_block_header( &mut self, - peer_id: H256, + _peer_id: H256, connection: &mut PeerConnection, block_number: u64, ) -> Result, PeerHandlerError> { @@ -638,14 +608,9 @@ impl PeerHandler { reverse: false, }); debug!("get_block_header: requesting header with number {block_number}"); - match PeerHandler::make_request( - &self.peer_table, - peer_id, - connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index e678893d8cb..38b85633add 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -236,6 +236,55 @@ pub enum ContactValidation { IpMismatch, } +/// Reservation handle for a peer request slot. +/// +/// **Contract:** when a `RequestPermit` exists, the `requests` counter for +/// its peer has been incremented by one. When the permit is dropped, a +/// fire-and-forget `DecRequests` message releases the slot. +/// +/// The permit is the output of `get_best_peer`, `get_best_peer_excluding`, +/// and `get_random_peer`. Those handlers bump the counter atomically with +/// peer selection (same handler call, under the actor's `&mut self`). +/// +/// The permit MUST travel with whatever code owns the outstanding request — +/// move it into spawned tasks, send it through channels alongside results, +/// etc. Dropping it early releases the slot early. `#[must_use]` catches +/// accidental discards at call sites that should be holding it. +#[must_use = "dropping this permit immediately releases the peer's request slot"] +pub struct RequestPermit { + peer_table: PeerTable, + peer_id: H256, +} + +impl RequestPermit { + pub(crate) fn new(peer_table: PeerTable, peer_id: H256) -> Self { + Self { + peer_table, + peer_id, + } + } + + pub fn peer_id(&self) -> H256 { + self.peer_id + } +} + +impl std::fmt::Debug for RequestPermit { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RequestPermit") + .field("peer_id", &self.peer_id) + .finish_non_exhaustive() + } +} + +impl Drop for RequestPermit { + fn drop(&mut self) { + // Fire-and-forget. If the actor mailbox is closed, p2p is already + // shutting down — the lost decrement is a non-issue. + let _ = self.peer_table.dec_requests(self.peer_id); + } +} + #[protocol] pub trait PeerTableServerProtocol: Send + Sync { // Send (cast) methods @@ -258,7 +307,6 @@ pub trait PeerTableServerProtocol: Send + Sync { ) -> Result<(), ActorError>; fn set_session_info(&self, node_id: H256, session: Session) -> Result<(), ActorError>; fn remove_peer(&self, node_id: H256) -> Result<(), ActorError>; - fn inc_requests(&self, node_id: H256) -> Result<(), ActorError>; fn dec_requests(&self, node_id: H256) -> Result<(), ActorError>; fn set_unwanted(&self, node_id: H256) -> Result<(), ActorError>; fn set_is_fork_id_valid(&self, node_id: H256, valid: bool) -> Result<(), ActorError>; @@ -299,12 +347,12 @@ pub trait PeerTableServerProtocol: Send + Sync { fn get_best_peer( &self, capabilities: Vec, - ) -> Response>; + ) -> Response>; fn get_best_peer_excluding( &self, capabilities: Vec, excluded: Vec, - ) -> Response>; + ) -> Response>; fn get_score(&self, node_id: H256) -> Response; fn get_connected_nodes(&self) -> Response>; fn get_peers_with_capabilities(&self) @@ -325,9 +373,14 @@ pub trait PeerTableServerProtocol: Send + Sync { fn get_random_peer( &self, capabilities: Vec, - ) -> Response>; + ) -> Response>; fn get_session_info(&self, node_id: H256) -> Response>; fn get_peer_diagnostics(&self) -> Response>; + + #[cfg(test)] + fn _test_insert_peer(&self, peer_id: H256, peer: PeerData) -> Result<(), ActorError>; + #[cfg(test)] + fn _test_bump_requests(&self, peer_id: H256) -> Result<(), ActorError>; } #[derive(Debug)] @@ -427,25 +480,36 @@ impl PeerTableServer { } #[send_handler] - async fn handle_inc_requests( + async fn handle_dec_requests( &mut self, - msg: peer_table_server_protocol::IncRequests, + msg: peer_table_server_protocol::DecRequests, _ctx: &Context, ) { self.peers .entry(msg.node_id) - .and_modify(|peer_data| peer_data.requests += 1); + .and_modify(|peer_data| peer_data.requests = peer_data.requests.saturating_sub(1)); } + #[cfg(test)] #[send_handler] - async fn handle_dec_requests( + async fn handle_test_insert_peer( &mut self, - msg: peer_table_server_protocol::DecRequests, + msg: peer_table_server_protocol::TestInsertPeer, + _ctx: &Context, + ) { + self.peers.insert(msg.peer_id, msg.peer); + } + + #[cfg(test)] + #[send_handler] + async fn handle_test_bump_requests( + &mut self, + msg: peer_table_server_protocol::TestBumpRequests, _ctx: &Context, ) { self.peers - .entry(msg.node_id) - .and_modify(|peer_data| peer_data.requests = peer_data.requests.saturating_sub(1)); + .entry(msg.peer_id) + .and_modify(|peer_data| peer_data.requests += 1); } #[send_handler] @@ -706,18 +770,28 @@ impl PeerTableServer { async fn handle_get_best_peer( &mut self, msg: peer_table_server_protocol::GetBestPeer, - _ctx: &Context, - ) -> Option<(H256, PeerConnection)> { - self.do_get_best_peer(&msg.capabilities) + ctx: &Context, + ) -> Option<(H256, PeerConnection, RequestPermit)> { + let (peer_id, conn) = self.do_get_best_peer(&msg.capabilities)?; + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_best_peer must be present in self.peers") + .requests += 1; + Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) } #[request_handler] async fn handle_get_best_peer_excluding( &mut self, msg: peer_table_server_protocol::GetBestPeerExcluding, - _ctx: &Context, - ) -> Option<(H256, PeerConnection)> { - self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded) + ctx: &Context, + ) -> Option<(H256, PeerConnection, RequestPermit)> { + let (peer_id, conn) = self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded)?; + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_best_peer_excluding must be present in self.peers") + .requests += 1; + Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) } #[request_handler] @@ -829,9 +903,14 @@ impl PeerTableServer { async fn handle_get_random_peer( &mut self, msg: peer_table_server_protocol::GetRandomPeer, - _ctx: &Context, - ) -> Option<(H256, PeerConnection)> { - self.do_get_random_peer(msg.capabilities) + ctx: &Context, + ) -> Option<(H256, PeerConnection, RequestPermit)> { + let (peer_id, conn) = self.do_get_random_peer(msg.capabilities)?; + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_random_peer must be present in self.peers") + .requests += 1; + Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) } #[request_handler] @@ -1250,3 +1329,81 @@ impl PeerTableServer { } pub type PeerTable = ActorRef; + +#[cfg(test)] +mod permit_tests { + use super::*; + use crate::rlpx::p2p::SUPPORTED_ETH_CAPABILITIES; + use ethrex_common::{H256, H512}; + use ethrex_storage::{EngineType, Store}; + + async fn fresh_peer_table() -> PeerTable { + let store = Store::new("", EngineType::InMemory).expect("in-memory store"); + PeerTableServer::spawn(10, store) + } + + // NOTE: `PeerConnection` has a private field (`handle: ActorRef<...>`) and + // can only be constructed via `spawn_as_initiator` / `spawn_as_receiver`, + // both of which require a full `P2PContext`. That's too heavy to set up + // from a unit test. Therefore the happy-path test inserts a peer with + // `connection: None` and exercises the counter bookkeeping directly. + fn fake_peer_data() -> (H256, PeerData) { + let node = Node::new("127.0.0.1".parse().expect("ip"), 30303, 30303, H512::zero()); + let peer_id = H256::from_low_u64_be(1); + let peer = PeerData::new(node, None, None, SUPPORTED_ETH_CAPABILITIES.to_vec()); + (peer_id, peer) + } + + #[tokio::test] + async fn permit_drop_releases_reservation() { + // Regression coverage: dropping a permit with an unknown peer_id must + // not panic (saturating_sub protects us). + let table = fresh_peer_table().await; + let permit = RequestPermit::new(table.clone(), H256::zero()); + drop(permit); + let count = table.peer_count().await.unwrap(); + assert_eq!(count, 0); + } + + #[tokio::test] + async fn permit_drop_decrements_counter() { + // Insert a peer, bump `requests` via a test-only helper, then drop a + // permit pointing at that peer and verify the counter returned to 0. + let table = fresh_peer_table().await; + let (peer_id, peer) = fake_peer_data(); + + table + ._test_insert_peer(peer_id, peer) + .expect("insert fake peer"); + table._test_bump_requests(peer_id).expect("bump requests"); + + // Let the send messages land before we measure. + tokio::task::yield_now().await; + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + + let diag = table + .get_peer_diagnostics() + .await + .expect("diagnostics") + .into_iter() + .find(|p| p.peer_id == peer_id) + .expect("peer present"); + assert_eq!(diag.inflight_requests, 1, "test helper should have bumped"); + + let permit = RequestPermit::new(table.clone(), peer_id); + drop(permit); + // get_peer_diagnostics is a request; its response guarantees the + // dec_requests send from Drop has been processed (FIFO mailbox). + let diag_after = table + .get_peer_diagnostics() + .await + .expect("diagnostics") + .into_iter() + .find(|p| p.peer_id == peer_id) + .expect("peer present"); + assert_eq!( + diag_after.inflight_requests, 0, + "permit drop should release the slot" + ); + } +} diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 1106c6c101e..5aa5186e481 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -6,7 +6,7 @@ use crate::rlpx::message::Message as RLPxMessage; use crate::{ metrics::{CurrentStepValue, METRICS}, peer_handler::PeerHandler, - peer_table::{PeerTable, PeerTableServerProtocol as _}, + peer_table::{PeerTableServerProtocol as _, RequestPermit}, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, @@ -62,7 +62,6 @@ pub struct RequestStorageTrieNodesError { pub source: SnapError, } -#[derive(Clone)] struct StorageTaskResult { start_index: usize, account_storages: Vec>, @@ -70,6 +69,7 @@ struct StorageTaskResult { remaining_start: usize, remaining_end: usize, remaining_hash_range: (H256, Option), + permit: RequestPermit, } #[derive(Debug)] @@ -137,8 +137,12 @@ pub async fn request_account_range( let mut all_accounts_state = Vec::new(); // channel to send the tasks to the peers - let (task_sender, mut task_receiver) = - tokio::sync::mpsc::channel::<(Vec, H256, Option<(H256, H256)>)>(1000); + let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<( + Vec, + H256, + Option<(H256, H256)>, + RequestPermit, + )>(1000); info!("Starting to download account ranges from peers"); @@ -193,9 +197,8 @@ pub async fn request_account_range( last_update = SystemTime::now(); } - if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() { - // Release the reservation we made before spawning the task. - peers.peer_table.dec_requests(peer_id)?; + if let Ok((accounts, peer_id, chunk_start_end, _permit)) = task_receiver.try_recv() { + // _permit drops here, releasing the reservation. if let Some((chunk_start, chunk_end)) = chunk_start_end { if chunk_start <= chunk_end { tasks_queue_not_started.push_back((chunk_start, chunk_end)); @@ -223,7 +226,7 @@ pub async fn request_account_range( all_accounts_state.extend(accounts.iter().map(|unit| unit.account)); } - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -264,11 +267,9 @@ pub async fn request_account_range( .expect("Should be able to update pivot") } - // Reserve a request slot before spawning so get_best_peer sees - // this peer as busy immediately, preventing spawn floods. - // Workers call outgoing_request directly (not make_request) to - // avoid a double increment. Released via dec_requests on try_recv. - peers.peer_table.inc_requests(peer_id)?; + // Selection already reserved the slot; permit rides through the + // channel and drops in the receiver (or with the task if dropped + // before sending). tokio::spawn(request_account_range_worker( peer_id, @@ -277,6 +278,7 @@ pub async fn request_account_range( chunk_end, pivot_header.state_root, tx, + permit, )); } @@ -375,6 +377,7 @@ pub async fn request_bytecodes( peer_id: H256, remaining_start: usize, remaining_end: usize, + permit: RequestPermit, } let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::(1000); @@ -396,9 +399,9 @@ pub async fn request_bytecodes( peer_id, remaining_start, remaining_end, + permit: _permit, } = result; - // Release the reservation we made before spawning the task. - peers.peer_table.dec_requests(peer_id)?; + // _permit drops here, releasing the reservation. debug!( "Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})", @@ -423,7 +426,7 @@ pub async fn request_bytecodes( } } - let Some((peer_id, mut connection)) = peers + let Some((peer_id, mut connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -458,17 +461,11 @@ pub async fn request_bytecodes( .copied() .collect(); - // Reserve a request slot before spawning (see account range comment). - peers.peer_table.inc_requests(peer_id)?; + // Selection already reserved the slot; permit rides through the + // channel and drops in the receiver (or with the task if dropped + // before sending). tokio::spawn(async move { - let empty_task_result = TaskResult { - start_index: chunk_start, - bytecodes: vec![], - peer_id, - remaining_start: chunk_start, - remaining_end: chunk_end, - }; debug!( "Requesting bytecode from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}" ); @@ -478,37 +475,43 @@ pub async fn request_bytecodes( hashes: hashes_to_request.clone(), bytes: MAX_RESPONSE_BYTES, }); - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - if let Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) = connection + + // Collect into a single (bytecodes, remaining_start) pair so the + // task has a single `tx.send` site — `RequestPermit` is not `Clone`. + let (bytecodes, remaining_start) = match connection .outgoing_request(request, PEER_REPLY_TIMEOUT) .await { - if codes.is_empty() { - tx.send(empty_task_result).await.ok(); - // Too spammy - // tracing::error!("Received empty account range"); - return; + Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) if !codes.is_empty() => { + // Validate response by hashing bytecodes + let validated_codes: Vec = codes + .into_iter() + .zip(hashes_to_request) + .take_while(|(b, hash)| ethrex_common::utils::keccak(b) == *hash) + .map(|(b, _hash)| b) + .collect(); + let new_remaining_start = chunk_start + validated_codes.len(); + (validated_codes, new_remaining_start) } - // Validate response by hashing bytecodes - let validated_codes: Vec = codes - .into_iter() - .zip(hashes_to_request) - .take_while(|(b, hash)| ethrex_common::utils::keccak(b) == *hash) - .map(|(b, _hash)| b) - .collect(); - let result = TaskResult { - start_index: chunk_start, - remaining_start: chunk_start + validated_codes.len(), - bytecodes: validated_codes, - peer_id, - remaining_end: chunk_end, - }; - tx.send(result).await.ok(); - } else { - tracing::debug!("Failed to get bytecode"); - tx.send(empty_task_result).await.ok(); - } + Ok(RLPxMessage::ByteCodes(_)) => { + // Empty response; retry the full chunk. + (Vec::new(), chunk_start) + } + _ => { + tracing::debug!("Failed to get bytecode"); + (Vec::new(), chunk_start) + } + }; + + let result = TaskResult { + start_index: chunk_start, + bytecodes, + peer_id, + remaining_start, + remaining_end: chunk_end, + permit, + }; + tx.send(result).await.ok(); }); } @@ -659,9 +662,9 @@ pub async fn request_storage_ranges( remaining_start, remaining_end, remaining_hash_range: (hash_start, hash_end), + permit: _permit, } = result; - // Release the reservation we made before spawning the task. - peers.peer_table.dec_requests(peer_id)?; + // _permit drops here, releasing the reservation. completed_tasks += 1; for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { @@ -958,7 +961,7 @@ pub async fn request_storage_ranges( break; } - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -999,9 +1002,9 @@ pub async fn request_storage_ranges( chunk_storage_roots.first().unwrap_or(&H256::zero()), ); } - // Reserve a request slot before spawning (see account range comment). - peers.peer_table.inc_requests(peer_id)?; - + // Selection already reserved the slot; permit rides through the + // channel and drops in the receiver (or with the task if dropped + // before sending). tokio::spawn(request_storage_ranges_worker( task, peer_id, @@ -1010,6 +1013,7 @@ pub async fn request_storage_ranges( chunk_account_hashes, chunk_storage_roots, tx, + permit, )); } @@ -1056,15 +1060,15 @@ pub async fn request_storage_ranges( } pub async fn request_state_trienodes( - peer_id: H256, mut connection: PeerConnection, - peer_table: PeerTable, + _permit: RequestPermit, state_root: H256, paths: Vec, ) -> Result, SnapError> { let expected_nodes = paths.len(); - // Keep track of peers we requested from so we can penalize unresponsive peers when we get a response - // This is so we avoid penalizing peers due to requesting stale data + // The caller already holds a request reservation for this peer, + // so call outgoing_request directly to avoid a double increment. + // _permit drops at the end of this function's scope. let request_id = rand::random(); let request = RLPxMessage::GetTrieNodes(GetTrieNodes { @@ -1077,14 +1081,9 @@ pub async fn request_state_trienodes( .collect(), bytes: MAX_RESPONSE_BYTES, }); - let nodes = match PeerHandler::make_request( - &peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + let nodes = match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { Ok(RLPxMessage::TrieNodes(trie_nodes)) => trie_nodes .nodes @@ -1121,23 +1120,18 @@ pub async fn request_state_trienodes( /// - There are no available peers (the node just started up or was rejected by all other nodes) /// - No peer returned a valid response in the given time and retry limits pub async fn request_storage_trienodes( - peer_id: H256, mut connection: PeerConnection, - peer_table: PeerTable, + _permit: RequestPermit, get_trie_nodes: GetTrieNodes, ) -> Result { - // Keep track of peers we requested from so we can penalize unresponsive peers when we get a response - // This is so we avoid penalizing peers due to requesting stale data + // The caller already holds a request reservation for this peer, + // so call outgoing_request directly to avoid a double increment. + // _permit drops at the end of this function's scope. let request_id = get_trie_nodes.id; let request = RLPxMessage::GetTrieNodes(get_trie_nodes); - match PeerHandler::make_request( - &peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { Ok(RLPxMessage::TrieNodes(trie_nodes)) => Ok(trie_nodes), Ok(other_msg) => Err(RequestStorageTrieNodesError { @@ -1161,7 +1155,13 @@ async fn request_account_range_worker( chunk_start: H256, chunk_end: H256, state_root: H256, - tx: tokio::sync::mpsc::Sender<(Vec, H256, Option<(H256, H256)>)>, + tx: tokio::sync::mpsc::Sender<( + Vec, + H256, + Option<(H256, H256)>, + RequestPermit, + )>, + permit: RequestPermit, ) -> Result<(), SnapError> { debug!("Requesting account range from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}"); let request_id = rand::random(); @@ -1172,7 +1172,16 @@ async fn request_account_range_worker( limit_hash: chunk_end, response_bytes: MAX_RESPONSE_BYTES, }); - if let Ok(RLPxMessage::AccountRange(AccountRange { + + // Collect the result into (accounts, chunk_left) so the function has a + // single `tx.send` site — `RequestPermit` is not `Clone`. + let retry = || { + ( + Vec::::new(), + Some((chunk_start, chunk_end)), + ) + }; + let (accounts_out, chunk_left) = if let Ok(RLPxMessage::AccountRange(AccountRange { id: _, accounts, proof, @@ -1183,71 +1192,65 @@ async fn request_account_range_worker( .await { if accounts.is_empty() { - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - return Ok(()); - } - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let (account_hashes, account_states): (Vec<_>, Vec<_>) = accounts - .clone() - .into_iter() - .map(|unit| (unit.hash, unit.account)) - .unzip(); - let encoded_accounts = account_states - .iter() - .map(|acc| acc.encode_to_vec()) - .collect::>(); - - let Ok(should_continue) = verify_range( - state_root, - &chunk_start, - &account_hashes, - &encoded_accounts, - &proof, - ) else { - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - tracing::error!("Received invalid account range"); - return Ok(()); - }; - - // If the range has more accounts to fetch, we send the new chunk - let chunk_left = if should_continue { - let last_hash = match account_hashes.last() { - Some(last_hash) => last_hash, - None => { - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - error!("Account hashes last failed, this shouldn't happen"); - return Err(SnapError::NoAccountHashes); - } - }; - let new_start_u256 = U256::from_big_endian(&last_hash.0) + 1; - let new_start = H256::from_uint(&new_start_u256); - Some((new_start, chunk_end)) + retry() } else { - None - }; - tx.send(( - accounts + // Unzip & validate response + let proof = encodable_to_proof(&proof); + let (account_hashes, account_states): (Vec<_>, Vec<_>) = accounts + .clone() .into_iter() - .filter(|unit| unit.hash <= chunk_end) - .collect(), - peer_id, - chunk_left, - )) - .await - .ok(); + .map(|unit| (unit.hash, unit.account)) + .unzip(); + let encoded_accounts = account_states + .iter() + .map(|acc| acc.encode_to_vec()) + .collect::>(); + + match verify_range( + state_root, + &chunk_start, + &account_hashes, + &encoded_accounts, + &proof, + ) { + Ok(should_continue) => { + // If the range has more accounts to fetch, we send the new chunk + let chunk_left = if should_continue { + match account_hashes.last() { + Some(last_hash) => { + let new_start_u256 = U256::from_big_endian(&last_hash.0) + 1; + let new_start = H256::from_uint(&new_start_u256); + Some((new_start, chunk_end)) + } + None => { + // Unreachable: accounts is non-empty here. + error!("Account hashes last failed, this shouldn't happen"); + Some((chunk_start, chunk_end)) + } + } + } else { + None + }; + let filtered = accounts + .into_iter() + .filter(|unit| unit.hash <= chunk_end) + .collect::>(); + (filtered, chunk_left) + } + Err(_) => { + tracing::error!("Received invalid account range"); + retry() + } + } + } } else { tracing::debug!("Failed to get account range"); - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - } + retry() + }; + + tx.send((accounts_out, peer_id, chunk_left, permit)) + .await + .ok(); Ok::<(), SnapError>(()) } @@ -1260,19 +1263,24 @@ async fn request_storage_ranges_worker( chunk_account_hashes: Vec, chunk_storage_roots: Vec, tx: tokio::sync::mpsc::Sender, + permit: RequestPermit, ) -> Result<(), SnapError> { let start = task.start_index; let end = task.end_index; let start_hash = task.start_hash; - let empty_task_result = StorageTaskResult { - start_index: task.start_index, - account_storages: Vec::new(), - peer_id, - remaining_start: task.start_index, - remaining_end: task.end_index, - remaining_hash_range: (start_hash, task.end_hash), + // Defaults for the "retry this same range" outcome used by every failure + // branch below. Collapsing to a single `tx.send` at the bottom — permit + // is not `Clone`. + let retry_outcome = || { + ( + Vec::>::new(), + task.start_index, + task.end_index, + (start_hash, task.end_hash), + ) }; + let request_id = rand::random(); let request = RLPxMessage::GetStorageRanges(GetStorageRanges { id: request_id, @@ -1283,134 +1291,149 @@ async fn request_storage_ranges_worker( response_bytes: MAX_RESPONSE_BYTES, }); tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", "Sending storage range request"); - let Ok(RLPxMessage::StorageRanges(StorageRanges { - id: _, - slots, - proof, - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await - else { - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout"); - tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed"); - tracing::debug!("Failed to get storage range"); - tx.send(empty_task_result).await.ok(); - return Ok(()); - }; - if slots.is_empty() && proof.is_empty() { - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty"); - tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty"); - tx.send(empty_task_result).await.ok(); - tracing::debug!("Received empty storage range"); - return Ok(()); - } - // Check we got some data and no more than the requested amount - if slots.len() > chunk_storage_roots.len() || slots.is_empty() { - tx.send(empty_task_result).await.ok(); - return Ok(()); - } - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let mut account_storages: Vec> = vec![]; - let mut should_continue = false; - // Validate each storage range - let mut storage_roots = chunk_storage_roots.into_iter(); - let last_slot_index = slots.len() - 1; - for (i, next_account_slots) in slots.into_iter().enumerate() { - // We won't accept empty storage ranges - if next_account_slots.is_empty() { - // This shouldn't happen - error!("Received empty storage range, skipping"); - tx.send(empty_task_result.clone()).await.ok(); - return Ok(()); - } - let encoded_values = next_account_slots - .iter() - .map(|slot| slot.data.encode_to_vec()) - .collect::>(); - let hashed_keys: Vec<_> = next_account_slots.iter().map(|slot| slot.hash).collect(); - let storage_root = match storage_roots.next() { - Some(root) => root, - None => { - tx.send(empty_task_result.clone()).await.ok(); - error!("No storage root for account {i}"); - return Err(SnapError::NoStorageRoots); - } + // The caller already holds a request reservation for this peer, + // so call outgoing_request directly to avoid a double increment. + let (account_storages, remaining_start, remaining_end, remaining_hash_range) = 'outcome: { + let Ok(RLPxMessage::StorageRanges(StorageRanges { + id: _, + slots, + proof, + })) = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await + else { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout"); + tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed"); + tracing::debug!("Failed to get storage range"); + break 'outcome retry_outcome(); }; + if slots.is_empty() && proof.is_empty() { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty"); + tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty"); + tracing::debug!("Received empty storage range"); + break 'outcome retry_outcome(); + } + // Check we got some data and no more than the requested amount + if slots.len() > chunk_storage_roots.len() || slots.is_empty() { + break 'outcome retry_outcome(); + } + // Unzip & validate response + let proof = encodable_to_proof(&proof); + let mut account_storages: Vec> = vec![]; + let mut should_continue = false; + let mut validation_failed = false; + // Validate each storage range + let mut storage_roots = chunk_storage_roots.into_iter(); + let last_slot_index = slots.len() - 1; + for (i, next_account_slots) in slots.into_iter().enumerate() { + // We won't accept empty storage ranges + if next_account_slots.is_empty() { + // This shouldn't happen + error!("Received empty storage range, skipping"); + validation_failed = true; + break; + } + let encoded_values = next_account_slots + .iter() + .map(|slot| slot.data.encode_to_vec()) + .collect::>(); + let hashed_keys: Vec<_> = next_account_slots.iter().map(|slot| slot.hash).collect(); + + let storage_root = match storage_roots.next() { + Some(root) => root, + None => { + error!("No storage root for account {i}"); + // Preserve the original error-log behavior; return the + // retry outcome so the caller can retry (the original + // code also sent empty_task_result here before + // returning Err). + break 'outcome retry_outcome(); + } + }; - // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs - if i == last_slot_index && !proof.is_empty() { - let Ok(sc) = verify_range( + // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs + if i == last_slot_index && !proof.is_empty() { + let Ok(sc) = verify_range( + storage_root, + &start_hash, + &hashed_keys, + &encoded_values, + &proof, + ) else { + validation_failed = true; + break; + }; + should_continue = sc; + } else if verify_range( storage_root, &start_hash, &hashed_keys, &encoded_values, - &proof, - ) else { - tx.send(empty_task_result).await.ok(); - return Ok(()); - }; - should_continue = sc; - } else if verify_range( - storage_root, - &start_hash, - &hashed_keys, - &encoded_values, - &[], - ) - .is_err() - { - tx.send(empty_task_result.clone()).await.ok(); - return Ok(()); + &[], + ) + .is_err() + { + validation_failed = true; + break; + } + + account_storages.push( + next_account_slots + .iter() + .map(|slot| (slot.hash, slot.data)) + .collect(), + ); } - account_storages.push( - next_account_slots - .iter() - .map(|slot| (slot.hash, slot.data)) - .collect(), - ); - } - let (remaining_start, remaining_end, remaining_start_hash) = if should_continue { - let last_account_storage = match account_storages.last() { - Some(storage) => storage, - None => { - tx.send(empty_task_result.clone()).await.ok(); - error!("No account storage found, this shouldn't happen"); - return Err(SnapError::NoAccountStorages); - } - }; - let (last_hash, _) = match last_account_storage.last() { - Some(last_hash) => last_hash, - None => { - tx.send(empty_task_result.clone()).await.ok(); - error!("No last hash found, this shouldn't happen"); - return Err(SnapError::NoAccountStorages); - } + if validation_failed { + break 'outcome retry_outcome(); + } + + let (remaining_start, remaining_end, remaining_start_hash) = if should_continue { + let last_account_storage = match account_storages.last() { + Some(storage) => storage, + None => { + error!("No account storage found, this shouldn't happen"); + break 'outcome retry_outcome(); + } + }; + let (last_hash, _) = match last_account_storage.last() { + Some(last_hash) => last_hash, + None => { + error!("No last hash found, this shouldn't happen"); + break 'outcome retry_outcome(); + } + }; + let next_hash_u256 = U256::from_big_endian(&last_hash.0).saturating_add(1.into()); + let next_hash = H256::from_uint(&next_hash_u256); + (start + account_storages.len() - 1, end, next_hash) + } else { + (start + account_storages.len(), end, H256::zero()) }; - let next_hash_u256 = U256::from_big_endian(&last_hash.0).saturating_add(1.into()); - let next_hash = H256::from_uint(&next_hash_u256); - (start + account_storages.len() - 1, end, next_hash) - } else { - (start + account_storages.len(), end, H256::zero()) + let slot_count: usize = account_storages.iter().map(|s| s.len()).sum(); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success"); + tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received"); + ( + account_storages, + remaining_start, + remaining_end, + (remaining_start_hash, task.end_hash), + ) }; - let slot_count: usize = account_storages.iter().map(|s| s.len()).sum(); + let task_result = StorageTaskResult { start_index: start, account_storages, peer_id, remaining_start, remaining_end, - remaining_hash_range: (remaining_start_hash, task.end_hash), + remaining_hash_range, + permit, }; - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success"); - tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received"); tx.send(task_result).await.ok(); Ok::<(), SnapError>(()) } diff --git a/crates/networking/p2p/sync/healing/state.rs b/crates/networking/p2p/sync/healing/state.rs index 54c46658a8b..e13a74f9ee1 100644 --- a/crates/networking/p2p/sync/healing/state.rs +++ b/crates/networking/p2p/sync/healing/state.rs @@ -216,7 +216,7 @@ async fn heal_state_trie( .unwrap_or_default(), longest_path_seen, ); - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -243,17 +243,11 @@ async fn heal_state_trie( let tx = task_sender.clone(); inflight_tasks += 1; - let peer_table = peers.peer_table.clone(); tokio::spawn(async move { // TODO: check errors to determine whether the current block is stale - let response = request_state_trienodes( - peer_id, - connection, - peer_table, - state_root, - batch.clone(), - ) - .await; + let response = + request_state_trienodes(connection, permit, state_root, batch.clone()) + .await; // TODO: add error handling tx.send((peer_id, response, batch)).await.inspect_err( |err| debug!(error=?err, "Failed to send state trie nodes response"), diff --git a/crates/networking/p2p/sync/healing/storage.rs b/crates/networking/p2p/sync/healing/storage.rs index c593b3756ba..ab39e9b8b07 100644 --- a/crates/networking/p2p/sync/healing/storage.rs +++ b/crates/networking/p2p/sync/healing/storage.rs @@ -331,7 +331,7 @@ async fn ask_peers_for_nodes( logged_no_free_peers_count: &mut u32, ) { if (requests.len() as u32) < MAX_IN_FLIGHT_REQUESTS && !download_queue.is_empty() { - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -368,11 +368,9 @@ async fn ask_peers_for_nodes( let tx = task_sender.clone(); - let peer_table = peers.peer_table.clone(); - requests_task_joinset.spawn(async move { let req_id = gtn.id; - let response = request_storage_trienodes(peer_id, connection, peer_table, gtn).await; + let response = request_storage_trienodes(connection, permit, gtn).await; // TODO: add error handling tx.try_send(response).inspect_err( |err| debug!(error=?err, "Failed to send state trie nodes response"), diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 37a864a0ef9..f928d59bc83 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -739,7 +739,10 @@ pub async fn update_pivot( tokio::time::sleep(delay).await; } - let Some((peer_id, mut connection)) = peers + // Hold the permit for the duration of the attempt loop. It's dropped + // at the end of each outer-loop iteration — either on success (return), + // on exclusion (continue after exhausting retries), or on fall-through. + let Some((peer_id, mut connection, _permit)) = peers .peer_table .get_best_peer_excluding(SUPPORTED_ETH_CAPABILITIES.to_vec(), excluded_peers.clone()) .await? From 44191e9ebb0db57143072b1c28e1a5c559f9b456 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Thu, 23 Apr 2026 15:55:21 -0300 Subject: [PATCH 2/7] feat(p2p): add get_best_n_peers, restore head-probe slot tracking --- crates/networking/p2p/peer_handler.rs | 13 ++-- crates/networking/p2p/peer_table.rs | 90 +++++++++++++++++---------- 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index cad43b22eec..ca406b7d672 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -152,22 +152,19 @@ impl PeerHandler { // sync_head is unknown to our peers return Ok(None); } - let peer_connection = self + let peers = self .peer_table - .get_peer_connections(SUPPORTED_ETH_CAPABILITIES.to_vec()) + .get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), MAX_PEERS_TO_ASK) .await?; - let selected_peers: Vec<_> = peer_connection - .iter() - .take(MAX_PEERS_TO_ASK) - .map(|(id, _)| *id) - .collect(); + let selected_peers: Vec<_> = peers.iter().map(|(id, _, _)| *id).collect(); debug!( retry = retries, peers_selected = ?selected_peers, "request_block_headers: resolving sync head with peers" ); - for (peer_id, mut connection) in peer_connection.into_iter().take(MAX_PEERS_TO_ASK) { + for (peer_id, mut connection, _permit) in peers { + // _permit holds the slot for the duration of this iteration. match ask_peer_head_number(peer_id, &mut connection, sync_head, retries).await { Ok(number) => { sync_head_number = number; diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 38b85633add..0a9b76e12d9 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -353,14 +353,15 @@ pub trait PeerTableServerProtocol: Send + Sync { capabilities: Vec, excluded: Vec, ) -> Response>; + fn get_best_n_peers( + &self, + capabilities: Vec, + n: usize, + ) -> Response>; fn get_score(&self, node_id: H256) -> Response; fn get_connected_nodes(&self) -> Response>; fn get_peers_with_capabilities(&self) -> Response)>>; - fn get_peer_connections( - &self, - capabilities: Vec, - ) -> Response>; fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response; fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response; fn get_closest_nodes(&self, node_id: H256) -> Response>; @@ -794,6 +795,24 @@ impl PeerTableServer { Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) } + #[request_handler] + async fn handle_get_best_n_peers( + &mut self, + msg: peer_table_server_protocol::GetBestNPeers, + ctx: &Context, + ) -> Vec<(H256, PeerConnection, RequestPermit)> { + let picks = self.do_get_best_n_peers(&msg.capabilities, msg.n); + let mut out = Vec::with_capacity(picks.len()); + for (peer_id, conn) in picks { + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_best_n_peers must be present in self.peers") + .requests += 1; + out.push((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))); + } + out + } + #[request_handler] async fn handle_get_score( &mut self, @@ -838,15 +857,6 @@ impl PeerTableServer { .collect() } - #[request_handler] - async fn handle_get_peer_connections( - &mut self, - msg: peer_table_server_protocol::GetPeerConnections, - _ctx: &Context, - ) -> Vec<(H256, PeerConnection)> { - self.do_get_peer_connections(msg.capabilities) - } - #[request_handler] async fn handle_insert_if_new( &mut self, @@ -1001,6 +1011,39 @@ impl PeerTableServer { .map(|(k, _, _, v)| (k, v)) } + /// Returns up to `n` best peers with capability overlap, sorted by weight + /// descending. Excludes peers at capacity. Does NOT mutate state — caller + /// is responsible for incrementing `requests` on each returned peer. + fn do_get_best_n_peers( + &self, + capabilities: &[Capability], + n: usize, + ) -> Vec<(H256, PeerConnection)> { + let mut candidates: Vec<(H256, i64, i64, PeerConnection)> = self + .peers + .iter() + .filter_map(|(id, peer_data)| { + if !self.can_try_more_requests(&peer_data.score, &peer_data.requests) + || !capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + { + None + } else { + let connection = peer_data.connection.clone()?; + Some((*id, peer_data.score, peer_data.requests, connection)) + } + }) + .collect(); + + candidates.sort_by_key(|(_, score, reqs, _)| -self.weight_peer(score, reqs)); + candidates + .into_iter() + .take(n) + .map(|(id, _, _, conn)| (id, conn)) + .collect() + } + fn prune(&mut self) { let disposable_contacts = self .contacts @@ -1249,27 +1292,6 @@ impl PeerTableServer { .count() } - fn do_get_peer_connections( - &self, - capabilities: Vec, - ) -> Vec<(H256, PeerConnection)> { - self.peers - .iter() - .filter_map(|(peer_id, peer_data)| { - if !capabilities - .iter() - .any(|cap| peer_data.supported_capabilities.contains(cap)) - { - return None; - } - peer_data - .connection - .clone() - .map(|connection| (*peer_id, connection)) - }) - .collect() - } - fn do_get_random_peer(&self, capabilities: Vec) -> Option<(H256, PeerConnection)> { let peers: Vec<(H256, &PeerConnection, i64)> = self .peers From bffaba55a745ad1878836dcca7149ae0e779d77a Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Thu, 23 Apr 2026 16:00:20 -0300 Subject: [PATCH 3/7] test(p2p): add happy-path coverage for permit reservation --- crates/networking/p2p/peer_table.rs | 128 +++++++++++++++++- .../networking/p2p/rlpx/connection/server.rs | 27 ++++ 2 files changed, 150 insertions(+), 5 deletions(-) diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 0a9b76e12d9..26e5907a72c 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -1364,11 +1364,10 @@ mod permit_tests { PeerTableServer::spawn(10, store) } - // NOTE: `PeerConnection` has a private field (`handle: ActorRef<...>`) and - // can only be constructed via `spawn_as_initiator` / `spawn_as_receiver`, - // both of which require a full `P2PContext`. That's too heavy to set up - // from a unit test. Therefore the happy-path test inserts a peer with - // `connection: None` and exercises the counter bookkeeping directly. + // `PeerConnection` has a private field and its production constructors + // require a full `P2PContext`. For tests we use `PeerConnection::for_test` + // together with `detached_peer_connection_handle` to mint a handle without + // running a real handshake. fn fake_peer_data() -> (H256, PeerData) { let node = Node::new("127.0.0.1".parse().expect("ip"), 30303, 30303, H512::zero()); let peer_id = H256::from_low_u64_be(1); @@ -1376,6 +1375,22 @@ mod permit_tests { (peer_id, peer) } + /// Build a `PeerData` with a fake-but-valid `PeerConnection` so that + /// `do_get_best_peer_excluding`'s `connection.is_some()` filter accepts it. + fn fake_connected_peer(peer_id_byte: u64) -> (H256, PeerData) { + use crate::rlpx::connection::server::{PeerConnection, detached_peer_connection_handle}; + let node = Node::new("127.0.0.1".parse().expect("ip"), 30303, 30303, H512::zero()); + let peer_id = H256::from_low_u64_be(peer_id_byte); + let connection = PeerConnection::for_test(detached_peer_connection_handle()); + let peer = PeerData::new( + node, + None, + Some(connection), + SUPPORTED_ETH_CAPABILITIES.to_vec(), + ); + (peer_id, peer) + } + #[tokio::test] async fn permit_drop_releases_reservation() { // Regression coverage: dropping a permit with an unknown peer_id must @@ -1428,4 +1443,107 @@ mod permit_tests { "permit drop should release the slot" ); } + + #[tokio::test] + async fn get_best_peer_bumps_counter_and_drop_releases() { + // End-to-end happy path: `get_best_peer` must atomically reserve a + // slot (bumping `requests` to 1) and return a permit whose drop + // decrements back to 0. + let table = fresh_peer_table().await; + let (peer_id, peer) = fake_connected_peer(1); + + table + ._test_insert_peer(peer_id, peer) + .expect("insert fake peer"); + + let selected = table + .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) + .await + .expect("actor alive"); + let (got_id, _conn, permit) = selected.expect("peer should be selected"); + assert_eq!(got_id, peer_id); + + let diag = table + .get_peer_diagnostics() + .await + .expect("diagnostics") + .into_iter() + .find(|p| p.peer_id == peer_id) + .expect("peer present"); + assert_eq!( + diag.inflight_requests, 1, + "selection should atomically bump the in-flight counter" + ); + + drop(permit); + // `get_peer_diagnostics` is a request handler. Awaiting its response + // ensures the preceding fire-and-forget `dec_requests` send from the + // permit's Drop has been processed (FIFO mailbox). + let diag_after = table + .get_peer_diagnostics() + .await + .expect("diagnostics") + .into_iter() + .find(|p| p.peer_id == peer_id) + .expect("peer present"); + assert_eq!( + diag_after.inflight_requests, 0, + "dropping the permit should release the slot" + ); + } + + #[tokio::test] + async fn get_best_n_peers_bumps_all_and_releases_all_on_drop() { + // Batched reservation path: `get_best_n_peers` returns a vector of + // (id, conn, permit) triples and must have bumped each selected + // peer's counter before returning. Dropping all three permits must + // decrement all three counters back to 0. + let table = fresh_peer_table().await; + let (id_a, peer_a) = fake_connected_peer(1); + let (id_b, peer_b) = fake_connected_peer(2); + let (id_c, peer_c) = fake_connected_peer(3); + + table + ._test_insert_peer(id_a, peer_a) + .expect("insert peer A"); + table + ._test_insert_peer(id_b, peer_b) + .expect("insert peer B"); + table + ._test_insert_peer(id_c, peer_c) + .expect("insert peer C"); + + let selected = table + .get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), 3) + .await + .expect("actor alive"); + assert_eq!(selected.len(), 3, "should return all three peers"); + + let diags = table.get_peer_diagnostics().await.expect("diagnostics"); + for id in [id_a, id_b, id_c] { + let diag = diags + .iter() + .find(|p| p.peer_id == id) + .expect("peer present"); + assert_eq!( + diag.inflight_requests, 1, + "peer {id:?} should have its counter bumped", + ); + } + + // Drop all permits (consumes the vector, dropping the triples). + drop(selected); + + let diags_after = table.get_peer_diagnostics().await.expect("diagnostics"); + for id in [id_a, id_b, id_c] { + let diag = diags_after + .iter() + .find(|p| p.peer_id == id) + .expect("peer present"); + assert_eq!( + diag.inflight_requests, 0, + "peer {id:?} should have its counter released", + ); + } + } } diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 2a39c74023f..f7d87b6a116 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -184,6 +184,33 @@ impl PeerConnection { } } +#[cfg(test)] +impl PeerConnection { + /// Test-only constructor that wraps a pre-built handle. Allows seeding + /// peer tables with fake peers in unit tests without spinning up a full + /// `P2PContext`. Tests should not call `outgoing_message` / + /// `outgoing_request` on instances built this way. + pub fn for_test(handle: ActorRef) -> Self { + Self { handle } + } +} + +/// Test-only helper that mints a detached `ActorRef`. +/// +/// `ActorRef` has no public `new`: the only way to obtain one is through +/// `ActorStart::start()`. We start a `PeerConnectionServer` in the +/// `HandshakeFailed` state, which causes `handshake::perform` to return an +/// error and `ctx.stop()` to be invoked immediately — no networking is +/// attempted. The `ActorRef` returned by `start()` is still a valid handle +/// and is all tests need to populate `PeerData.connection`. +#[cfg(test)] +pub fn detached_peer_connection_handle() -> ActorRef { + let server = PeerConnectionServer { + state: ConnectionState::HandshakeFailed, + }; + server.start() +} + #[derive(Debug)] pub struct Initiator { pub(crate) context: P2PContext, From 263bf9cf26ef6d10875a2c83305a9a4e7608ad46 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Thu, 23 Apr 2026 17:01:27 -0300 Subject: [PATCH 4/7] refactor(p2p): tighten RequestPermit lifetime --- crates/networking/p2p/peer_handler.rs | 82 +++++----- crates/networking/p2p/peer_table.rs | 21 ++- .../networking/p2p/rlpx/connection/server.rs | 2 +- crates/networking/p2p/snap/client.rs | 94 +++++------ crates/networking/p2p/sync/snap_sync.rs | 150 ++++++++---------- 5 files changed, 161 insertions(+), 188 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index ca406b7d672..c17f08c3989 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -53,6 +53,7 @@ pub enum BlockRequestOrder { async fn ask_peer_head_number( peer_id: H256, connection: &mut PeerConnection, + _permit: RequestPermit, sync_head: H256, retries: i32, ) -> Result { @@ -69,6 +70,7 @@ async fn ask_peer_head_number( debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}"); + // `_permit` drops at the end of this function, releasing the peer's slot. match connection .outgoing_request(request, PEER_REPLY_TIMEOUT) .await @@ -163,9 +165,10 @@ impl PeerHandler { peers_selected = ?selected_peers, "request_block_headers: resolving sync head with peers" ); - for (peer_id, mut connection, _permit) in peers { - // _permit holds the slot for the duration of this iteration. - match ask_peer_head_number(peer_id, &mut connection, sync_head, retries).await { + for (peer_id, mut connection, permit) in peers { + match ask_peer_head_number(peer_id, &mut connection, permit, sync_head, retries) + .await + { Ok(number) => { sync_head_number = number; if number != 0 { @@ -225,14 +228,8 @@ impl PeerHandler { let mut downloaded_count = 0_u64; // channel to send the tasks to the peers - let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<( - Vec, - H256, - PeerConnection, - u64, - u64, - RequestPermit, - )>(1000); + let (task_sender, mut task_receiver) = + tokio::sync::mpsc::channel::<(Vec, H256, PeerConnection, u64, u64)>(1000); let mut current_show = 0; @@ -245,10 +242,11 @@ impl PeerHandler { let mut logged_no_free_peers_count = 0; loop { - if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit, _permit)) = + if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) = task_receiver.try_recv() { - // _permit drops here, releasing the reservation. + // The worker already dropped its permit when the wire request + // returned, so the peer's slot is already free here. trace!("We received a download chunk from peer"); if headers.is_empty() { @@ -325,12 +323,16 @@ impl PeerHandler { current_show += 1; } + // Queue drained but in-flight tasks haven't returned yet. + // Drop the permit we just acquired (end of scope) and yield + // so the result receive path gets a chance to run. + tokio::task::yield_now().await; continue; }; let tx = task_sender.clone(); - // Selection atomically reserved a request slot; the permit rides - // through the channel and drops in the completion handler (or - // with the task if it's dropped before sending). + // `download_chunk_from_peer` consumes the permit and drops it + // when the wire request returns, so the peer's slot is freed + // before the result is sent back through the channel. debug!("Downloader {peer_id} is now busy"); tokio::spawn(async move { @@ -340,6 +342,7 @@ impl PeerHandler { let headers = Self::download_chunk_from_peer( peer_id, &mut connection, + permit, startblock, chunk_limit, ) @@ -347,18 +350,11 @@ impl PeerHandler { .inspect_err(|err| trace!("Sync Log 6: {peer_id} failed to download chunk: {err}")) .unwrap_or_default(); - tx.send(( - headers, - peer_id, - connection, - startblock, - chunk_limit, - permit, - )) - .await - .inspect_err(|err| { - error!("Failed to send headers result through channel. Error: {err}") - }) + tx.send((headers, peer_id, connection, startblock, chunk_limit)) + .await + .inspect_err(|err| { + error!("Failed to send headers result through channel. Error: {err}") + }) }); } @@ -419,14 +415,16 @@ impl PeerHandler { }); match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), - Some((peer_id, mut connection, _permit)) => { - // _permit drops at end of this arm, releasing the slot. + Some((peer_id, mut connection, permit)) => { + // Release the peer's slot as soon as the wire response is in. + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); if let Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, block_headers, - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response { if block_headers.is_empty() { // Empty response is valid per eth spec (peer may not have these blocks) @@ -457,11 +455,12 @@ impl PeerHandler { } /// Given a peer id, a chunk start and a chunk limit, requests the block headers from the peer. - /// The caller owns the `RequestPermit` for this peer, so this just - /// forwards to `connection.outgoing_request` — the permit handles lifecycle. + /// Consumes a `RequestPermit` that was reserved at peer selection time; + /// the permit drops when this function returns, releasing the peer's slot. async fn download_chunk_from_peer( peer_id: H256, connection: &mut PeerConnection, + _permit: RequestPermit, startblock: u64, chunk_limit: u64, ) -> Result, PeerHandlerError> { @@ -508,14 +507,16 @@ impl PeerHandler { }); match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), - Some((peer_id, mut connection, _permit)) => { - // _permit drops at end of this arm, releasing the slot. + Some((peer_id, mut connection, permit)) => { + // Release the peer's slot as soon as the wire response is in. + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); if let Ok(RLPxMessage::BlockBodies(BlockBodies { id: _, block_bodies, - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response { // Check that the response is not empty and does not contain more bodies than the ones requested if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len { @@ -594,6 +595,7 @@ impl PeerHandler { &mut self, _peer_id: H256, connection: &mut PeerConnection, + _permit: RequestPermit, block_number: u64, ) -> Result, PeerHandlerError> { let request_id = rand::random(); diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 26e5907a72c..304c3189b1b 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -243,8 +243,9 @@ pub enum ContactValidation { /// fire-and-forget `DecRequests` message releases the slot. /// /// The permit is the output of `get_best_peer`, `get_best_peer_excluding`, -/// and `get_random_peer`. Those handlers bump the counter atomically with -/// peer selection (same handler call, under the actor's `&mut self`). +/// `get_best_n_peers`, and `get_random_peer`. Those handlers bump the +/// counter atomically with peer selection (same handler call, under the +/// actor's `&mut self`). /// /// The permit MUST travel with whatever code owns the outstanding request — /// move it into spawned tasks, send it through channels alongside results, @@ -263,10 +264,6 @@ impl RequestPermit { peer_id, } } - - pub fn peer_id(&self) -> H256 { - self.peer_id - } } impl std::fmt::Debug for RequestPermit { @@ -358,6 +355,9 @@ pub trait PeerTableServerProtocol: Send + Sync { capabilities: Vec, n: usize, ) -> Response>; + /// Read-only predicate: is there any eligible peer matching `capabilities`? + /// Does not reserve a slot; use for capacity/rotation probes only. + fn has_eligible_peer(&self, capabilities: Vec) -> Response; fn get_score(&self, node_id: H256) -> Response; fn get_connected_nodes(&self) -> Response>; fn get_peers_with_capabilities(&self) @@ -813,6 +813,15 @@ impl PeerTableServer { out } + #[request_handler] + async fn handle_has_eligible_peer( + &mut self, + msg: peer_table_server_protocol::HasEligiblePeer, + _ctx: &Context, + ) -> bool { + self.do_get_best_peer(&msg.capabilities).is_some() + } + #[request_handler] async fn handle_get_score( &mut self, diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index f7d87b6a116..2cf1eb30cb9 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -204,7 +204,7 @@ impl PeerConnection { /// attempted. The `ActorRef` returned by `start()` is still a valid handle /// and is all tests need to populate `PeerData.connection`. #[cfg(test)] -pub fn detached_peer_connection_handle() -> ActorRef { +pub(crate) fn detached_peer_connection_handle() -> ActorRef { let server = PeerConnectionServer { state: ConnectionState::HandshakeFailed, }; diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 5aa5186e481..dcdc666a53b 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -69,7 +69,6 @@ struct StorageTaskResult { remaining_start: usize, remaining_end: usize, remaining_hash_range: (H256, Option), - permit: RequestPermit, } #[derive(Debug)] @@ -137,12 +136,8 @@ pub async fn request_account_range( let mut all_accounts_state = Vec::new(); // channel to send the tasks to the peers - let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<( - Vec, - H256, - Option<(H256, H256)>, - RequestPermit, - )>(1000); + let (task_sender, mut task_receiver) = + tokio::sync::mpsc::channel::<(Vec, H256, Option<(H256, H256)>)>(1000); info!("Starting to download account ranges from peers"); @@ -197,8 +192,8 @@ pub async fn request_account_range( last_update = SystemTime::now(); } - if let Ok((accounts, peer_id, chunk_start_end, _permit)) = task_receiver.try_recv() { - // _permit drops here, releasing the reservation. + if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() { + // The worker already dropped its permit; peer slot is free here. if let Some((chunk_start, chunk_end)) = chunk_start_end { if chunk_start <= chunk_end { tasks_queue_not_started.push_back((chunk_start, chunk_end)); @@ -377,7 +372,6 @@ pub async fn request_bytecodes( peer_id: H256, remaining_start: usize, remaining_end: usize, - permit: RequestPermit, } let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::(1000); @@ -399,9 +393,8 @@ pub async fn request_bytecodes( peer_id, remaining_start, remaining_end, - permit: _permit, } = result; - // _permit drops here, releasing the reservation. + // The worker already dropped its permit; peer slot is free here. debug!( "Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})", @@ -461,9 +454,8 @@ pub async fn request_bytecodes( .copied() .collect(); - // Selection already reserved the slot; permit rides through the - // channel and drops in the receiver (or with the task if dropped - // before sending). + // The spawned task drops its permit as soon as the wire response is + // in, so the peer's slot is freed before we send the result back. tokio::spawn(async move { debug!( @@ -476,12 +468,12 @@ pub async fn request_bytecodes( bytes: MAX_RESPONSE_BYTES, }); - // Collect into a single (bytecodes, remaining_start) pair so the - // task has a single `tx.send` site — `RequestPermit` is not `Clone`. - let (bytecodes, remaining_start) = match connection + let response = connection .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await - { + .await; + drop(permit); + + let (bytecodes, remaining_start) = match response { Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) if !codes.is_empty() => { // Validate response by hashing bytecodes let validated_codes: Vec = codes @@ -509,7 +501,6 @@ pub async fn request_bytecodes( peer_id, remaining_start, remaining_end: chunk_end, - permit, }; tx.send(result).await.ok(); }); @@ -662,9 +653,8 @@ pub async fn request_storage_ranges( remaining_start, remaining_end, remaining_hash_range: (hash_start, hash_end), - permit: _permit, } = result; - // _permit drops here, releasing the reservation. + // The worker already dropped its permit; peer slot is free here. completed_tasks += 1; for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { @@ -1155,12 +1145,7 @@ async fn request_account_range_worker( chunk_start: H256, chunk_end: H256, state_root: H256, - tx: tokio::sync::mpsc::Sender<( - Vec, - H256, - Option<(H256, H256)>, - RequestPermit, - )>, + tx: tokio::sync::mpsc::Sender<(Vec, H256, Option<(H256, H256)>)>, permit: RequestPermit, ) -> Result<(), SnapError> { debug!("Requesting account range from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}"); @@ -1173,8 +1158,13 @@ async fn request_account_range_worker( response_bytes: MAX_RESPONSE_BYTES, }); - // Collect the result into (accounts, chunk_left) so the function has a - // single `tx.send` site — `RequestPermit` is not `Clone`. + // Perform the wire request and release the peer slot as soon as the + // response (or error) is in — processing below is pure computation. + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); + let retry = || { ( Vec::::new(), @@ -1185,26 +1175,17 @@ async fn request_account_range_worker( id: _, accounts, proof, - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response { if accounts.is_empty() { retry() } else { - // Unzip & validate response + // Validate response — build the verification inputs by borrowing + // `accounts` so we can still consume it for the filtered output. let proof = encodable_to_proof(&proof); - let (account_hashes, account_states): (Vec<_>, Vec<_>) = accounts - .clone() - .into_iter() - .map(|unit| (unit.hash, unit.account)) - .unzip(); - let encoded_accounts = account_states - .iter() - .map(|acc| acc.encode_to_vec()) - .collect::>(); + let account_hashes: Vec = accounts.iter().map(|u| u.hash).collect(); + let encoded_accounts: Vec<_> = + accounts.iter().map(|u| u.account.encode_to_vec()).collect(); match verify_range( state_root, @@ -1248,9 +1229,7 @@ async fn request_account_range_worker( retry() }; - tx.send((accounts_out, peer_id, chunk_left, permit)) - .await - .ok(); + tx.send((accounts_out, peer_id, chunk_left)).await.ok(); Ok::<(), SnapError>(()) } @@ -1270,8 +1249,7 @@ async fn request_storage_ranges_worker( let start_hash = task.start_hash; // Defaults for the "retry this same range" outcome used by every failure - // branch below. Collapsing to a single `tx.send` at the bottom — permit - // is not `Clone`. + // branch below. let retry_outcome = || { ( Vec::>::new(), @@ -1292,16 +1270,19 @@ async fn request_storage_ranges_worker( }); tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", "Sending storage range request"); - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. + // Perform the wire request and release the peer slot as soon as the + // response (or error) is in — validation below is pure computation. + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); + let (account_storages, remaining_start, remaining_end, remaining_hash_range) = 'outcome: { let Ok(RLPxMessage::StorageRanges(StorageRanges { id: _, slots, proof, - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response else { #[cfg(feature = "metrics")] ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout"); @@ -1432,7 +1413,6 @@ async fn request_storage_ranges_worker( remaining_start, remaining_end, remaining_hash_range, - permit, }; tx.send(task_result).await.ok(); Ok::<(), SnapError>(()) diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index f928d59bc83..165e7d23461 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -686,10 +686,9 @@ pub async fn update_pivot( block_sync_state: &mut SnapBlockSyncState, diagnostics: &Arc>, ) -> Result { - const MAX_RETRIES_PER_PEER: u64 = 3; /// Maximum number of full peer rotations before giving up. With rotation, /// each pass tries every eligible peer once; the budget scales naturally - /// with network size. + /// with network size. Between rotations we back off exponentially. const MAX_ROTATIONS: u64 = 5; const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1); const MAX_RETRY_DELAY: Duration = Duration::from_secs(30); @@ -739,22 +738,18 @@ pub async fn update_pivot( tokio::time::sleep(delay).await; } - // Hold the permit for the duration of the attempt loop. It's dropped - // at the end of each outer-loop iteration — either on success (return), - // on exclusion (continue after exhausting retries), or on fall-through. - let Some((peer_id, mut connection, _permit)) = peers + // One permit per attempt: consumed by `get_block_header` below. + let Some((peer_id, mut connection, permit)) = peers .peer_table .get_best_peer_excluding(SUPPORTED_ETH_CAPABILITIES.to_vec(), excluded_peers.clone()) .await? else { // Distinguish "rotation exhausted" from "no peers currently eligible - // (all at capacity)". Check if any peer is eligible ignoring - // exclusions — if so, we're waiting on capacity, not rotation. + // (all at capacity)". Read-only probe — does not bump `requests`. let any_eligible = peers .peer_table - .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) - .await? - .is_some(); + .has_eligible_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) + .await?; if !any_eligible { debug!("update_pivot: no eligible peers available, waiting"); @@ -794,82 +789,69 @@ pub async fn update_pivot( "Trying to update pivot to {new_pivot_block_number} with peer {peer_id} (score: {peer_score})" ); - // Try up to MAX_RETRIES_PER_PEER times with this specific peer. - // Both Ok(None) and recoverable errors count as a failure and advance - // through retries; on exhaustion, the peer is excluded and we rotate. - let mut peer_failures: u64 = 0; - for attempt in 0..MAX_RETRIES_PER_PEER { - let outcome = peers - .get_block_header(peer_id, &mut connection, new_pivot_block_number) - .await; - - match outcome { - Ok(Some(pivot)) => { - // Success — reward peer and return - peers.peer_table.record_success(peer_id)?; - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success"); - info!("Successfully updated pivot"); - - { - let mut diag = diagnostics.write().await; - diag.push_pivot_change(super::PivotChangeEvent { - timestamp: current_unix_time(), - old_pivot_number: block_number, - new_pivot_number: pivot.number, - outcome: "success".to_string(), - failure_reason: None, - }); - diag.pivot_block_number = Some(pivot.number); - diag.pivot_timestamp = Some(pivot.timestamp); - let pivot_age = current_unix_time().saturating_sub(pivot.timestamp); - diag.pivot_age_seconds = Some(pivot_age); - METRICS - .pivot_timestamp - .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed); - } - let block_headers = peers - .request_block_headers(block_number + 1, pivot.hash()) - .await? - .ok_or(SyncError::NoBlockHeaders)?; - block_sync_state - .process_incoming_headers(block_headers.into_iter()) - .await?; - *METRICS.sync_head_hash.lock().await = pivot.hash(); - return Ok(pivot); - } - Ok(None) => { - peers.peer_table.record_failure(peer_id)?; - peer_failures += 1; - let peer_score = peers.peer_table.get_score(peer_id).await?; - warn!( - "update_pivot: peer {peer_id} returned None (attempt {}/{MAX_RETRIES_PER_PEER}, score: {peer_score})", - attempt + 1 - ); - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none"); - } - Err(e) if e.is_recoverable() => { - peers.peer_table.record_failure(peer_id)?; - peer_failures += 1; - warn!( - "update_pivot: peer {peer_id} failed with {e} (attempt {}/{MAX_RETRIES_PER_PEER})", - attempt + 1 - ); - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error"); - } - Err(e) => { - // Non-recoverable error (e.g., dead peer table actor, - // storage full) — surface it. - return Err(SyncError::PeerHandler(e)); + // One attempt per peer per rotation. A peer that fails is excluded for + // this rotation and will be retried (with backoff) in the next one. + let outcome = peers + .get_block_header(peer_id, &mut connection, permit, new_pivot_block_number) + .await; + + match outcome { + Ok(Some(pivot)) => { + peers.peer_table.record_success(peer_id)?; + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success"); + info!("Successfully updated pivot"); + + { + let mut diag = diagnostics.write().await; + diag.push_pivot_change(super::PivotChangeEvent { + timestamp: current_unix_time(), + old_pivot_number: block_number, + new_pivot_number: pivot.number, + outcome: "success".to_string(), + failure_reason: None, + }); + diag.pivot_block_number = Some(pivot.number); + diag.pivot_timestamp = Some(pivot.timestamp); + let pivot_age = current_unix_time().saturating_sub(pivot.timestamp); + diag.pivot_age_seconds = Some(pivot_age); + METRICS + .pivot_timestamp + .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed); } + let block_headers = peers + .request_block_headers(block_number + 1, pivot.hash()) + .await? + .ok_or(SyncError::NoBlockHeaders)?; + block_sync_state + .process_incoming_headers(block_headers.into_iter()) + .await?; + *METRICS.sync_head_hash.lock().await = pivot.hash(); + return Ok(pivot); + } + Ok(None) => { + peers.peer_table.record_failure(peer_id)?; + let peer_score = peers.peer_table.get_score(peer_id).await?; + warn!( + "update_pivot: peer {peer_id} returned None (score: {peer_score}), excluding for this rotation" + ); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none"); + excluded_peers.push(peer_id); + } + Err(e) if e.is_recoverable() => { + peers.peer_table.record_failure(peer_id)?; + warn!("update_pivot: peer {peer_id} failed with {e}, excluding for this rotation"); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error"); + excluded_peers.push(peer_id); + } + Err(e) => { + // Non-recoverable error (e.g., dead peer table actor, + // storage full) — surface it. + return Err(SyncError::PeerHandler(e)); } } - - // Peer exhausted its retries — exclude it and try the next one - debug!("update_pivot: excluding peer {peer_id} after {peer_failures} failures"); - excluded_peers.push(peer_id); } } From 3d1196caff70d3d761014332c86486e328fed0a0 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Thu, 23 Apr 2026 18:04:08 -0300 Subject: [PATCH 5/7] refactor(p2p): drop permit tests, apply review feedback --- crates/networking/p2p/peer_handler.rs | 12 +- crates/networking/p2p/peer_table.rs | 248 +----------------- .../networking/p2p/rlpx/connection/server.rs | 27 -- crates/networking/p2p/snap/client.rs | 48 +--- 4 files changed, 30 insertions(+), 305 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index c17f08c3989..da5ec241b86 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -50,6 +50,8 @@ pub enum BlockRequestOrder { NewToOld, } +/// Asks a single already-selected peer for the block number at `sync_head`. +/// Consumes a `RequestPermit`; the permit drops on return, releasing the slot. async fn ask_peer_head_number( peer_id: H256, connection: &mut PeerConnection, @@ -70,7 +72,6 @@ async fn ask_peer_head_number( debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}"); - // `_permit` drops at the end of this function, releasing the peer's slot. match connection .outgoing_request(request, PEER_REPLY_TIMEOUT) .await @@ -245,9 +246,6 @@ impl PeerHandler { if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) = task_receiver.try_recv() { - // The worker already dropped its permit when the wire request - // returned, so the peer's slot is already free here. - trace!("We received a download chunk from peer"); if headers.is_empty() { self.peer_table.record_failure(peer_id)?; @@ -330,9 +328,6 @@ impl PeerHandler { continue; }; let tx = task_sender.clone(); - // `download_chunk_from_peer` consumes the permit and drops it - // when the wire request returns, so the peer's slot is freed - // before the result is sent back through the channel. debug!("Downloader {peer_id} is now busy"); tokio::spawn(async move { @@ -591,6 +586,9 @@ impl PeerHandler { Ok(self.peer_table.peer_count().await?) } + /// Requests a single block header by number from an already-selected peer. + /// Consumes a `RequestPermit` reserved by the caller at peer selection + /// time; the permit drops when this function returns, releasing the slot. pub async fn get_block_header( &mut self, _peer_id: H256, diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 304c3189b1b..91d369b6a18 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -239,18 +239,14 @@ pub enum ContactValidation { /// Reservation handle for a peer request slot. /// /// **Contract:** when a `RequestPermit` exists, the `requests` counter for -/// its peer has been incremented by one. When the permit is dropped, a -/// fire-and-forget `DecRequests` message releases the slot. +/// its peer has been incremented by one. Dropping the permit releases the +/// slot via a fire-and-forget `DecRequests` message. The handler that +/// returns the permit also bumps the counter atomically under `&mut self`, +/// so selection and reservation cannot be observed out of order. /// -/// The permit is the output of `get_best_peer`, `get_best_peer_excluding`, -/// `get_best_n_peers`, and `get_random_peer`. Those handlers bump the -/// counter atomically with peer selection (same handler call, under the -/// actor's `&mut self`). -/// -/// The permit MUST travel with whatever code owns the outstanding request — +/// The permit must travel with whatever code owns the outstanding request — /// move it into spawned tasks, send it through channels alongside results, -/// etc. Dropping it early releases the slot early. `#[must_use]` catches -/// accidental discards at call sites that should be holding it. +/// etc. Dropping early releases the slot early. #[must_use = "dropping this permit immediately releases the peer's request slot"] pub struct RequestPermit { peer_table: PeerTable, @@ -377,11 +373,6 @@ pub trait PeerTableServerProtocol: Send + Sync { ) -> Response>; fn get_session_info(&self, node_id: H256) -> Response>; fn get_peer_diagnostics(&self) -> Response>; - - #[cfg(test)] - fn _test_insert_peer(&self, peer_id: H256, peer: PeerData) -> Result<(), ActorError>; - #[cfg(test)] - fn _test_bump_requests(&self, peer_id: H256) -> Result<(), ActorError>; } #[derive(Debug)] @@ -486,31 +477,12 @@ impl PeerTableServer { msg: peer_table_server_protocol::DecRequests, _ctx: &Context, ) { - self.peers - .entry(msg.node_id) - .and_modify(|peer_data| peer_data.requests = peer_data.requests.saturating_sub(1)); - } - - #[cfg(test)] - #[send_handler] - async fn handle_test_insert_peer( - &mut self, - msg: peer_table_server_protocol::TestInsertPeer, - _ctx: &Context, - ) { - self.peers.insert(msg.peer_id, msg.peer); - } - - #[cfg(test)] - #[send_handler] - async fn handle_test_bump_requests( - &mut self, - msg: peer_table_server_protocol::TestBumpRequests, - _ctx: &Context, - ) { - self.peers - .entry(msg.peer_id) - .and_modify(|peer_data| peer_data.requests += 1); + self.peers.entry(msg.node_id).and_modify(|peer_data| { + // Clamp at 0: a stale permit drop firing after a peer + // disconnect+reconnect would otherwise push `requests` + // negative (i64::saturating_sub saturates at i64::MIN). + peer_data.requests = peer_data.requests.saturating_sub(1).max(0) + }); } #[send_handler] @@ -1360,199 +1332,3 @@ impl PeerTableServer { } pub type PeerTable = ActorRef; - -#[cfg(test)] -mod permit_tests { - use super::*; - use crate::rlpx::p2p::SUPPORTED_ETH_CAPABILITIES; - use ethrex_common::{H256, H512}; - use ethrex_storage::{EngineType, Store}; - - async fn fresh_peer_table() -> PeerTable { - let store = Store::new("", EngineType::InMemory).expect("in-memory store"); - PeerTableServer::spawn(10, store) - } - - // `PeerConnection` has a private field and its production constructors - // require a full `P2PContext`. For tests we use `PeerConnection::for_test` - // together with `detached_peer_connection_handle` to mint a handle without - // running a real handshake. - fn fake_peer_data() -> (H256, PeerData) { - let node = Node::new("127.0.0.1".parse().expect("ip"), 30303, 30303, H512::zero()); - let peer_id = H256::from_low_u64_be(1); - let peer = PeerData::new(node, None, None, SUPPORTED_ETH_CAPABILITIES.to_vec()); - (peer_id, peer) - } - - /// Build a `PeerData` with a fake-but-valid `PeerConnection` so that - /// `do_get_best_peer_excluding`'s `connection.is_some()` filter accepts it. - fn fake_connected_peer(peer_id_byte: u64) -> (H256, PeerData) { - use crate::rlpx::connection::server::{PeerConnection, detached_peer_connection_handle}; - let node = Node::new("127.0.0.1".parse().expect("ip"), 30303, 30303, H512::zero()); - let peer_id = H256::from_low_u64_be(peer_id_byte); - let connection = PeerConnection::for_test(detached_peer_connection_handle()); - let peer = PeerData::new( - node, - None, - Some(connection), - SUPPORTED_ETH_CAPABILITIES.to_vec(), - ); - (peer_id, peer) - } - - #[tokio::test] - async fn permit_drop_releases_reservation() { - // Regression coverage: dropping a permit with an unknown peer_id must - // not panic (saturating_sub protects us). - let table = fresh_peer_table().await; - let permit = RequestPermit::new(table.clone(), H256::zero()); - drop(permit); - let count = table.peer_count().await.unwrap(); - assert_eq!(count, 0); - } - - #[tokio::test] - async fn permit_drop_decrements_counter() { - // Insert a peer, bump `requests` via a test-only helper, then drop a - // permit pointing at that peer and verify the counter returned to 0. - let table = fresh_peer_table().await; - let (peer_id, peer) = fake_peer_data(); - - table - ._test_insert_peer(peer_id, peer) - .expect("insert fake peer"); - table._test_bump_requests(peer_id).expect("bump requests"); - - // Let the send messages land before we measure. - tokio::task::yield_now().await; - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - - let diag = table - .get_peer_diagnostics() - .await - .expect("diagnostics") - .into_iter() - .find(|p| p.peer_id == peer_id) - .expect("peer present"); - assert_eq!(diag.inflight_requests, 1, "test helper should have bumped"); - - let permit = RequestPermit::new(table.clone(), peer_id); - drop(permit); - // get_peer_diagnostics is a request; its response guarantees the - // dec_requests send from Drop has been processed (FIFO mailbox). - let diag_after = table - .get_peer_diagnostics() - .await - .expect("diagnostics") - .into_iter() - .find(|p| p.peer_id == peer_id) - .expect("peer present"); - assert_eq!( - diag_after.inflight_requests, 0, - "permit drop should release the slot" - ); - } - - #[tokio::test] - async fn get_best_peer_bumps_counter_and_drop_releases() { - // End-to-end happy path: `get_best_peer` must atomically reserve a - // slot (bumping `requests` to 1) and return a permit whose drop - // decrements back to 0. - let table = fresh_peer_table().await; - let (peer_id, peer) = fake_connected_peer(1); - - table - ._test_insert_peer(peer_id, peer) - .expect("insert fake peer"); - - let selected = table - .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) - .await - .expect("actor alive"); - let (got_id, _conn, permit) = selected.expect("peer should be selected"); - assert_eq!(got_id, peer_id); - - let diag = table - .get_peer_diagnostics() - .await - .expect("diagnostics") - .into_iter() - .find(|p| p.peer_id == peer_id) - .expect("peer present"); - assert_eq!( - diag.inflight_requests, 1, - "selection should atomically bump the in-flight counter" - ); - - drop(permit); - // `get_peer_diagnostics` is a request handler. Awaiting its response - // ensures the preceding fire-and-forget `dec_requests` send from the - // permit's Drop has been processed (FIFO mailbox). - let diag_after = table - .get_peer_diagnostics() - .await - .expect("diagnostics") - .into_iter() - .find(|p| p.peer_id == peer_id) - .expect("peer present"); - assert_eq!( - diag_after.inflight_requests, 0, - "dropping the permit should release the slot" - ); - } - - #[tokio::test] - async fn get_best_n_peers_bumps_all_and_releases_all_on_drop() { - // Batched reservation path: `get_best_n_peers` returns a vector of - // (id, conn, permit) triples and must have bumped each selected - // peer's counter before returning. Dropping all three permits must - // decrement all three counters back to 0. - let table = fresh_peer_table().await; - let (id_a, peer_a) = fake_connected_peer(1); - let (id_b, peer_b) = fake_connected_peer(2); - let (id_c, peer_c) = fake_connected_peer(3); - - table - ._test_insert_peer(id_a, peer_a) - .expect("insert peer A"); - table - ._test_insert_peer(id_b, peer_b) - .expect("insert peer B"); - table - ._test_insert_peer(id_c, peer_c) - .expect("insert peer C"); - - let selected = table - .get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), 3) - .await - .expect("actor alive"); - assert_eq!(selected.len(), 3, "should return all three peers"); - - let diags = table.get_peer_diagnostics().await.expect("diagnostics"); - for id in [id_a, id_b, id_c] { - let diag = diags - .iter() - .find(|p| p.peer_id == id) - .expect("peer present"); - assert_eq!( - diag.inflight_requests, 1, - "peer {id:?} should have its counter bumped", - ); - } - - // Drop all permits (consumes the vector, dropping the triples). - drop(selected); - - let diags_after = table.get_peer_diagnostics().await.expect("diagnostics"); - for id in [id_a, id_b, id_c] { - let diag = diags_after - .iter() - .find(|p| p.peer_id == id) - .expect("peer present"); - assert_eq!( - diag.inflight_requests, 0, - "peer {id:?} should have its counter released", - ); - } - } -} diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 2cf1eb30cb9..2a39c74023f 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -184,33 +184,6 @@ impl PeerConnection { } } -#[cfg(test)] -impl PeerConnection { - /// Test-only constructor that wraps a pre-built handle. Allows seeding - /// peer tables with fake peers in unit tests without spinning up a full - /// `P2PContext`. Tests should not call `outgoing_message` / - /// `outgoing_request` on instances built this way. - pub fn for_test(handle: ActorRef) -> Self { - Self { handle } - } -} - -/// Test-only helper that mints a detached `ActorRef`. -/// -/// `ActorRef` has no public `new`: the only way to obtain one is through -/// `ActorStart::start()`. We start a `PeerConnectionServer` in the -/// `HandshakeFailed` state, which causes `handshake::perform` to return an -/// error and `ctx.stop()` to be invoked immediately — no networking is -/// attempted. The `ActorRef` returned by `start()` is still a valid handle -/// and is all tests need to populate `PeerData.connection`. -#[cfg(test)] -pub(crate) fn detached_peer_connection_handle() -> ActorRef { - let server = PeerConnectionServer { - state: ConnectionState::HandshakeFailed, - }; - server.start() -} - #[derive(Debug)] pub struct Initiator { pub(crate) context: P2PContext, diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index dcdc666a53b..cd71a01c88e 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -193,7 +193,6 @@ pub async fn request_account_range( } if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() { - // The worker already dropped its permit; peer slot is free here. if let Some((chunk_start, chunk_end)) = chunk_start_end { if chunk_start <= chunk_end { tasks_queue_not_started.push_back((chunk_start, chunk_end)); @@ -262,10 +261,6 @@ pub async fn request_account_range( .expect("Should be able to update pivot") } - // Selection already reserved the slot; permit rides through the - // channel and drops in the receiver (or with the task if dropped - // before sending). - tokio::spawn(request_account_range_worker( peer_id, connection, @@ -394,7 +389,6 @@ pub async fn request_bytecodes( remaining_start, remaining_end, } = result; - // The worker already dropped its permit; peer slot is free here. debug!( "Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})", @@ -454,9 +448,6 @@ pub async fn request_bytecodes( .copied() .collect(); - // The spawned task drops its permit as soon as the wire response is - // in, so the peer's slot is freed before we send the result back. - tokio::spawn(async move { debug!( "Requesting bytecode from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}" @@ -475,7 +466,6 @@ pub async fn request_bytecodes( let (bytecodes, remaining_start) = match response { Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) if !codes.is_empty() => { - // Validate response by hashing bytecodes let validated_codes: Vec = codes .into_iter() .zip(hashes_to_request) @@ -654,7 +644,6 @@ pub async fn request_storage_ranges( remaining_end, remaining_hash_range: (hash_start, hash_end), } = result; - // The worker already dropped its permit; peer slot is free here. completed_tasks += 1; for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { @@ -992,9 +981,6 @@ pub async fn request_storage_ranges( chunk_storage_roots.first().unwrap_or(&H256::zero()), ); } - // Selection already reserved the slot; permit rides through the - // channel and drops in the receiver (or with the task if dropped - // before sending). tokio::spawn(request_storage_ranges_worker( task, peer_id, @@ -1049,6 +1035,12 @@ pub async fn request_storage_ranges( Ok(chunk_index + 1) } +/// Requests state trie nodes at the given paths from an already-selected peer. +/// Consumes a `RequestPermit` reserved by the caller at peer selection time; +/// the permit drops when this function returns, releasing the slot. +/// Returns `SnapError::InvalidHash` if any returned node's hash does not match +/// the requested path, and `SnapError::InvalidData` on an empty or oversized +/// response. pub async fn request_state_trienodes( mut connection: PeerConnection, _permit: RequestPermit, @@ -1056,9 +1048,6 @@ pub async fn request_state_trienodes( paths: Vec, ) -> Result, SnapError> { let expected_nodes = paths.len(); - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - // _permit drops at the end of this function's scope. let request_id = rand::random(); let request = RLPxMessage::GetTrieNodes(GetTrieNodes { @@ -1104,19 +1093,18 @@ pub async fn request_state_trienodes( Ok(nodes) } -/// Requests storage trie nodes given the root of the state trie where they are contained and -/// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial) -/// Returns the nodes or None if: -/// - There are no available peers (the node just started up or was rejected by all other nodes) -/// - No peer returned a valid response in the given time and retry limits +/// Requests storage trie nodes from an already-selected peer. The `GetTrieNodes` +/// payload carries the state root and the per-account paths (hashed address +/// prefix followed by storage-trie paths, which may be full or partial). +/// Consumes a `RequestPermit` reserved by the caller at peer selection time; +/// the permit drops when this function returns, releasing the slot. +/// Errors are returned as `RequestStorageTrieNodesError` carrying the +/// request ID so the caller can reconcile it with its in-flight map. pub async fn request_storage_trienodes( mut connection: PeerConnection, _permit: RequestPermit, get_trie_nodes: GetTrieNodes, ) -> Result { - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - // _permit drops at the end of this function's scope. let request_id = get_trie_nodes.id; let request = RLPxMessage::GetTrieNodes(get_trie_nodes); match connection @@ -1195,7 +1183,6 @@ async fn request_account_range_worker( &proof, ) { Ok(should_continue) => { - // If the range has more accounts to fetch, we send the new chunk let chunk_left = if should_continue { match account_hashes.last() { Some(last_hash) => { @@ -1297,22 +1284,17 @@ async fn request_storage_ranges_worker( tracing::debug!("Received empty storage range"); break 'outcome retry_outcome(); } - // Check we got some data and no more than the requested amount if slots.len() > chunk_storage_roots.len() || slots.is_empty() { break 'outcome retry_outcome(); } - // Unzip & validate response let proof = encodable_to_proof(&proof); let mut account_storages: Vec> = vec![]; let mut should_continue = false; let mut validation_failed = false; - // Validate each storage range let mut storage_roots = chunk_storage_roots.into_iter(); let last_slot_index = slots.len() - 1; for (i, next_account_slots) in slots.into_iter().enumerate() { - // We won't accept empty storage ranges if next_account_slots.is_empty() { - // This shouldn't happen error!("Received empty storage range, skipping"); validation_failed = true; break; @@ -1327,10 +1309,6 @@ async fn request_storage_ranges_worker( Some(root) => root, None => { error!("No storage root for account {i}"); - // Preserve the original error-log behavior; return the - // retry outcome so the caller can retry (the original - // code also sent empty_task_result here before - // returning Err). break 'outcome retry_outcome(); } }; From 3087449233ce67a4ecb54cc7c8a8acef96178055 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Fri, 24 Apr 2026 15:16:05 -0300 Subject: [PATCH 6/7] refactor(p2p): address PR review feedback --- crates/networking/p2p/peer_handler.rs | 15 +++++++------- crates/networking/p2p/peer_table.rs | 26 ++++++++++++++++++++----- crates/networking/p2p/snap/client.rs | 13 +++++++------ crates/networking/p2p/sync/snap_sync.rs | 2 +- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index da5ec241b86..fe589ddba80 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -450,12 +450,12 @@ impl PeerHandler { } /// Given a peer id, a chunk start and a chunk limit, requests the block headers from the peer. - /// Consumes a `RequestPermit` that was reserved at peer selection time; - /// the permit drops when this function returns, releasing the peer's slot. + /// Releases the peer slot as soon as the wire response is in; validation + /// below is pure computation. async fn download_chunk_from_peer( peer_id: H256, connection: &mut PeerConnection, - _permit: RequestPermit, + permit: RequestPermit, startblock: u64, chunk_limit: u64, ) -> Result, PeerHandlerError> { @@ -468,12 +468,14 @@ impl PeerHandler { skip: 0, reverse: false, }); + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); if let Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, block_headers, - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response { if are_block_headers_chained(&block_headers, &BlockRequestOrder::OldToNew) { Ok(block_headers) @@ -591,7 +593,6 @@ impl PeerHandler { /// time; the permit drops when this function returns, releasing the slot. pub async fn get_block_header( &mut self, - _peer_id: H256, connection: &mut PeerConnection, _permit: RequestPermit, block_number: u64, diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index 91d369b6a18..be38d656c9e 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -478,9 +478,16 @@ impl PeerTableServer { _ctx: &Context, ) { self.peers.entry(msg.node_id).and_modify(|peer_data| { - // Clamp at 0: a stale permit drop firing after a peer - // disconnect+reconnect would otherwise push `requests` - // negative (i64::saturating_sub saturates at i64::MIN). + if peer_data.requests <= 0 { + // Expected under the reconnect race (stale permit fires + // after remove_peer + new_connected_peer), self-heals. + // Otherwise points to a bookkeeping bug worth chasing. + tracing::debug!( + peer_id = ?msg.node_id, + requests = peer_data.requests, + "dec_requests with counter already <= 0", + ); + } peer_data.requests = peer_data.requests.saturating_sub(1).max(0) }); } @@ -791,7 +798,14 @@ impl PeerTableServer { msg: peer_table_server_protocol::HasEligiblePeer, _ctx: &Context, ) -> bool { - self.do_get_best_peer(&msg.capabilities).is_some() + self.peers.values().any(|peer_data| { + peer_data.connection.is_some() + && self.can_try_more_requests(&peer_data.score, &peer_data.requests) + && msg + .capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + }) } #[request_handler] @@ -994,7 +1008,9 @@ impl PeerTableServer { /// Returns up to `n` best peers with capability overlap, sorted by weight /// descending. Excludes peers at capacity. Does NOT mutate state — caller - /// is responsible for incrementing `requests` on each returned peer. + /// is responsible for incrementing `requests` on each returned peer. The + /// sort uses a pre-increment snapshot: later picks don't see earlier + /// picks' bumps, which is fine for small `n`. fn do_get_best_n_peers( &self, capabilities: &[Capability], diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index cd71a01c88e..9fede93a267 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -1036,14 +1036,14 @@ pub async fn request_storage_ranges( } /// Requests state trie nodes at the given paths from an already-selected peer. -/// Consumes a `RequestPermit` reserved by the caller at peer selection time; -/// the permit drops when this function returns, releasing the slot. +/// Releases the peer slot as soon as the wire response is in; hash +/// verification below is pure computation. /// Returns `SnapError::InvalidHash` if any returned node's hash does not match /// the requested path, and `SnapError::InvalidData` on an empty or oversized /// response. pub async fn request_state_trienodes( mut connection: PeerConnection, - _permit: RequestPermit, + permit: RequestPermit, state_root: H256, paths: Vec, ) -> Result, SnapError> { @@ -1060,10 +1060,11 @@ pub async fn request_state_trienodes( .collect(), bytes: MAX_RESPONSE_BYTES, }); - let nodes = match connection + let response = connection .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await - { + .await; + drop(permit); + let nodes = match response { Ok(RLPxMessage::TrieNodes(trie_nodes)) => trie_nodes .nodes .iter() diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 165e7d23461..b612e598959 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -792,7 +792,7 @@ pub async fn update_pivot( // One attempt per peer per rotation. A peer that fails is excluded for // this rotation and will be retried (with backoff) in the next one. let outcome = peers - .get_block_header(peer_id, &mut connection, permit, new_pivot_block_number) + .get_block_header(&mut connection, permit, new_pivot_block_number) .await; match outcome { From 150a2372ff4ebaaac935a39f12bb1c215073c185 Mon Sep 17 00:00:00 2001 From: Lucas Fiegl Date: Fri, 24 Apr 2026 15:24:49 -0300 Subject: [PATCH 7/7] style(p2p): drop redundant permit-release comments --- crates/networking/p2p/peer_handler.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index fe589ddba80..c3d57af4603 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -411,7 +411,6 @@ impl PeerHandler { match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), Some((peer_id, mut connection, permit)) => { - // Release the peer's slot as soon as the wire response is in. let response = connection .outgoing_request(request, PEER_REPLY_TIMEOUT) .await; @@ -505,7 +504,6 @@ impl PeerHandler { match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), Some((peer_id, mut connection, permit)) => { - // Release the peer's slot as soon as the wire response is in. let response = connection .outgoing_request(request, PEER_REPLY_TIMEOUT) .await;