diff --git a/crates/networking/p2p/peer_handler.rs b/crates/networking/p2p/peer_handler.rs index 61b4349a827..c3d57af4603 100644 --- a/crates/networking/p2p/peer_handler.rs +++ b/crates/networking/p2p/peer_handler.rs @@ -1,7 +1,9 @@ use crate::rlpx::initiator::RLPxInitiator; use crate::{ metrics::{CurrentStepValue, METRICS}, - peer_table::{PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _}, + peer_table::{ + PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _, RequestPermit, + }, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, @@ -48,10 +50,12 @@ pub enum BlockRequestOrder { NewToOld, } +/// Asks a single already-selected peer for the block number at `sync_head`. +/// Consumes a `RequestPermit`; the permit drops on return, releasing the slot. async fn ask_peer_head_number( peer_id: H256, connection: &mut PeerConnection, - peer_table: &PeerTable, + _permit: RequestPermit, sync_head: H256, retries: i32, ) -> Result { @@ -68,7 +72,8 @@ async fn ask_peer_head_number( debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}"); - match PeerHandler::make_request(peer_table, peer_id, connection, request, PEER_REPLY_TIMEOUT) + match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) .await { Ok(RLPxMessage::BlockHeaders(BlockHeaders { @@ -104,27 +109,12 @@ impl PeerHandler { } } - pub(crate) async fn make_request( - // TODO: We should receive the PeerHandler (or self) instead, but since it is not yet spawnified it cannot be shared - // Fix this to avoid passing the PeerTable as a parameter - peer_table: &PeerTable, - peer_id: H256, - connection: &mut PeerConnection, - message: RLPxMessage, - timeout: Duration, - ) -> Result { - peer_table.inc_requests(peer_id)?; - let result = connection.outgoing_request(message, timeout).await; - peer_table.dec_requests(peer_id)?; - result - } - /// Returns a random node id and the channel ends to an active peer connection that supports the given capability /// It doesn't guarantee that the selected peer is not currently busy async fn get_random_peer( &mut self, capabilities: &[Capability], - ) -> Result, PeerHandlerError> { + ) -> Result, PeerHandlerError> { Ok(self .peer_table .get_random_peer(capabilities.to_vec()) @@ -165,30 +155,20 @@ impl PeerHandler { // sync_head is unknown to our peers return Ok(None); } - let peer_connection = self + let peers = self .peer_table - .get_peer_connections(SUPPORTED_ETH_CAPABILITIES.to_vec()) + .get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), MAX_PEERS_TO_ASK) .await?; - let selected_peers: Vec<_> = peer_connection - .iter() - .take(MAX_PEERS_TO_ASK) - .map(|(id, _)| *id) - .collect(); + let selected_peers: Vec<_> = peers.iter().map(|(id, _, _)| *id).collect(); debug!( retry = retries, peers_selected = ?selected_peers, "request_block_headers: resolving sync head with peers" ); - for (peer_id, mut connection) in peer_connection.into_iter().take(MAX_PEERS_TO_ASK) { - match ask_peer_head_number( - peer_id, - &mut connection, - &self.peer_table, - sync_head, - retries, - ) - .await + for (peer_id, mut connection, permit) in peers { + match ask_peer_head_number(peer_id, &mut connection, permit, sync_head, retries) + .await { Ok(number) => { sync_head_number = number; @@ -266,9 +246,6 @@ impl PeerHandler { if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) = task_receiver.try_recv() { - // Release the reservation we made before spawning the task. - self.peer_table.dec_requests(peer_id)?; - trace!("We received a download chunk from peer"); if headers.is_empty() { self.peer_table.record_failure(peer_id)?; @@ -316,7 +293,7 @@ impl PeerHandler { self.peer_table.record_success(peer_id)?; debug!("Downloader {peer_id} freed"); } - let Some((peer_id, mut connection)) = self + let Some((peer_id, mut connection, permit)) = self .peer_table .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) .await? @@ -344,26 +321,15 @@ impl PeerHandler { current_show += 1; } + // Queue drained but in-flight tasks haven't returned yet. + // Drop the permit we just acquired (end of scope) and yield + // so the result receive path gets a chance to run. + tokio::task::yield_now().await; continue; }; let tx = task_sender.clone(); - // Reserve a request slot before spawning so get_best_peer sees - // this peer as busy immediately, preventing the loop from - // spawning dozens of tasks for the same peer in a single tick. - // Reserve a request slot before spawning so get_best_peer sees - // this peer as busy immediately, preventing the loop from - // spawning dozens of tasks for the same peer in a single tick. - // The reservation is released in the completion handler - // (dec_requests on try_recv). The worker calls - // outgoing_request directly (not make_request) since we - // already hold the reservation. - self.peer_table.inc_requests(peer_id)?; debug!("Downloader {peer_id} is now busy"); - // Run download_chunk_from_peer in a different Tokio task. - // The worker must always send a result so dec_requests fires - // in the completion handler. The unwrap_or_default() ensures - // download errors don't panic. tokio::spawn(async move { trace!( "Sync Log 5: Requesting block headers from peer {peer_id}, chunk_limit: {chunk_limit}" @@ -371,6 +337,7 @@ impl PeerHandler { let headers = Self::download_chunk_from_peer( peer_id, &mut connection, + permit, startblock, chunk_limit, ) @@ -443,18 +410,15 @@ impl PeerHandler { }); match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), - Some((peer_id, mut connection)) => { + Some((peer_id, mut connection, permit)) => { + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); if let Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, block_headers, - })) = PeerHandler::make_request( - &self.peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + })) = response { if block_headers.is_empty() { // Empty response is valid per eth spec (peer may not have these blocks) @@ -485,12 +449,12 @@ impl PeerHandler { } /// Given a peer id, a chunk start and a chunk limit, requests the block headers from the peer. - /// The caller must already hold a request reservation for this peer - /// (via `inc_requests` before spawning), so we call `outgoing_request` - /// directly instead of `make_request` to avoid a double increment. + /// Releases the peer slot as soon as the wire response is in; validation + /// below is pure computation. async fn download_chunk_from_peer( peer_id: H256, connection: &mut PeerConnection, + permit: RequestPermit, startblock: u64, chunk_limit: u64, ) -> Result, PeerHandlerError> { @@ -503,12 +467,14 @@ impl PeerHandler { skip: 0, reverse: false, }); + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); if let Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, block_headers, - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response { if are_block_headers_chained(&block_headers, &BlockRequestOrder::OldToNew) { Ok(block_headers) @@ -537,18 +503,15 @@ impl PeerHandler { }); match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? { None => Ok(None), - Some((peer_id, mut connection)) => { + Some((peer_id, mut connection, permit)) => { + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); if let Ok(RLPxMessage::BlockBodies(BlockBodies { id: _, block_bodies, - })) = PeerHandler::make_request( - &self.peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + })) = response { // Check that the response is not empty and does not contain more bodies than the ones requested if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len { @@ -623,10 +586,13 @@ impl PeerHandler { Ok(self.peer_table.peer_count().await?) } + /// Requests a single block header by number from an already-selected peer. + /// Consumes a `RequestPermit` reserved by the caller at peer selection + /// time; the permit drops when this function returns, releasing the slot. pub async fn get_block_header( &mut self, - peer_id: H256, connection: &mut PeerConnection, + _permit: RequestPermit, block_number: u64, ) -> Result, PeerHandlerError> { let request_id = rand::random(); @@ -638,14 +604,9 @@ impl PeerHandler { reverse: false, }); debug!("get_block_header: requesting header with number {block_number}"); - match PeerHandler::make_request( - &self.peer_table, - peer_id, - connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { Ok(RLPxMessage::BlockHeaders(BlockHeaders { id: _, diff --git a/crates/networking/p2p/peer_table.rs b/crates/networking/p2p/peer_table.rs index e678893d8cb..be38d656c9e 100644 --- a/crates/networking/p2p/peer_table.rs +++ b/crates/networking/p2p/peer_table.rs @@ -236,6 +236,48 @@ pub enum ContactValidation { IpMismatch, } +/// Reservation handle for a peer request slot. +/// +/// **Contract:** when a `RequestPermit` exists, the `requests` counter for +/// its peer has been incremented by one. Dropping the permit releases the +/// slot via a fire-and-forget `DecRequests` message. The handler that +/// returns the permit also bumps the counter atomically under `&mut self`, +/// so selection and reservation cannot be observed out of order. +/// +/// The permit must travel with whatever code owns the outstanding request — +/// move it into spawned tasks, send it through channels alongside results, +/// etc. Dropping early releases the slot early. +#[must_use = "dropping this permit immediately releases the peer's request slot"] +pub struct RequestPermit { + peer_table: PeerTable, + peer_id: H256, +} + +impl RequestPermit { + pub(crate) fn new(peer_table: PeerTable, peer_id: H256) -> Self { + Self { + peer_table, + peer_id, + } + } +} + +impl std::fmt::Debug for RequestPermit { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RequestPermit") + .field("peer_id", &self.peer_id) + .finish_non_exhaustive() + } +} + +impl Drop for RequestPermit { + fn drop(&mut self) { + // Fire-and-forget. If the actor mailbox is closed, p2p is already + // shutting down — the lost decrement is a non-issue. + let _ = self.peer_table.dec_requests(self.peer_id); + } +} + #[protocol] pub trait PeerTableServerProtocol: Send + Sync { // Send (cast) methods @@ -258,7 +300,6 @@ pub trait PeerTableServerProtocol: Send + Sync { ) -> Result<(), ActorError>; fn set_session_info(&self, node_id: H256, session: Session) -> Result<(), ActorError>; fn remove_peer(&self, node_id: H256) -> Result<(), ActorError>; - fn inc_requests(&self, node_id: H256) -> Result<(), ActorError>; fn dec_requests(&self, node_id: H256) -> Result<(), ActorError>; fn set_unwanted(&self, node_id: H256) -> Result<(), ActorError>; fn set_is_fork_id_valid(&self, node_id: H256, valid: bool) -> Result<(), ActorError>; @@ -299,20 +340,24 @@ pub trait PeerTableServerProtocol: Send + Sync { fn get_best_peer( &self, capabilities: Vec, - ) -> Response>; + ) -> Response>; fn get_best_peer_excluding( &self, capabilities: Vec, excluded: Vec, - ) -> Response>; + ) -> Response>; + fn get_best_n_peers( + &self, + capabilities: Vec, + n: usize, + ) -> Response>; + /// Read-only predicate: is there any eligible peer matching `capabilities`? + /// Does not reserve a slot; use for capacity/rotation probes only. + fn has_eligible_peer(&self, capabilities: Vec) -> Response; fn get_score(&self, node_id: H256) -> Response; fn get_connected_nodes(&self) -> Response>; fn get_peers_with_capabilities(&self) -> Response)>>; - fn get_peer_connections( - &self, - capabilities: Vec, - ) -> Response>; fn insert_if_new(&self, node: Node, protocol: DiscoveryProtocol) -> Response; fn validate_contact(&self, node_id: H256, sender_ip: IpAddr) -> Response; fn get_closest_nodes(&self, node_id: H256) -> Response>; @@ -325,7 +370,7 @@ pub trait PeerTableServerProtocol: Send + Sync { fn get_random_peer( &self, capabilities: Vec, - ) -> Response>; + ) -> Response>; fn get_session_info(&self, node_id: H256) -> Response>; fn get_peer_diagnostics(&self) -> Response>; } @@ -426,26 +471,25 @@ impl PeerTableServer { self.peers.swap_remove(&msg.node_id); } - #[send_handler] - async fn handle_inc_requests( - &mut self, - msg: peer_table_server_protocol::IncRequests, - _ctx: &Context, - ) { - self.peers - .entry(msg.node_id) - .and_modify(|peer_data| peer_data.requests += 1); - } - #[send_handler] async fn handle_dec_requests( &mut self, msg: peer_table_server_protocol::DecRequests, _ctx: &Context, ) { - self.peers - .entry(msg.node_id) - .and_modify(|peer_data| peer_data.requests = peer_data.requests.saturating_sub(1)); + self.peers.entry(msg.node_id).and_modify(|peer_data| { + if peer_data.requests <= 0 { + // Expected under the reconnect race (stale permit fires + // after remove_peer + new_connected_peer), self-heals. + // Otherwise points to a bookkeeping bug worth chasing. + tracing::debug!( + peer_id = ?msg.node_id, + requests = peer_data.requests, + "dec_requests with counter already <= 0", + ); + } + peer_data.requests = peer_data.requests.saturating_sub(1).max(0) + }); } #[send_handler] @@ -706,18 +750,62 @@ impl PeerTableServer { async fn handle_get_best_peer( &mut self, msg: peer_table_server_protocol::GetBestPeer, - _ctx: &Context, - ) -> Option<(H256, PeerConnection)> { - self.do_get_best_peer(&msg.capabilities) + ctx: &Context, + ) -> Option<(H256, PeerConnection, RequestPermit)> { + let (peer_id, conn) = self.do_get_best_peer(&msg.capabilities)?; + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_best_peer must be present in self.peers") + .requests += 1; + Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) } #[request_handler] async fn handle_get_best_peer_excluding( &mut self, msg: peer_table_server_protocol::GetBestPeerExcluding, + ctx: &Context, + ) -> Option<(H256, PeerConnection, RequestPermit)> { + let (peer_id, conn) = self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded)?; + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_best_peer_excluding must be present in self.peers") + .requests += 1; + Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) + } + + #[request_handler] + async fn handle_get_best_n_peers( + &mut self, + msg: peer_table_server_protocol::GetBestNPeers, + ctx: &Context, + ) -> Vec<(H256, PeerConnection, RequestPermit)> { + let picks = self.do_get_best_n_peers(&msg.capabilities, msg.n); + let mut out = Vec::with_capacity(picks.len()); + for (peer_id, conn) in picks { + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_best_n_peers must be present in self.peers") + .requests += 1; + out.push((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))); + } + out + } + + #[request_handler] + async fn handle_has_eligible_peer( + &mut self, + msg: peer_table_server_protocol::HasEligiblePeer, _ctx: &Context, - ) -> Option<(H256, PeerConnection)> { - self.do_get_best_peer_excluding(&msg.capabilities, &msg.excluded) + ) -> bool { + self.peers.values().any(|peer_data| { + peer_data.connection.is_some() + && self.can_try_more_requests(&peer_data.score, &peer_data.requests) + && msg + .capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + }) } #[request_handler] @@ -764,15 +852,6 @@ impl PeerTableServer { .collect() } - #[request_handler] - async fn handle_get_peer_connections( - &mut self, - msg: peer_table_server_protocol::GetPeerConnections, - _ctx: &Context, - ) -> Vec<(H256, PeerConnection)> { - self.do_get_peer_connections(msg.capabilities) - } - #[request_handler] async fn handle_insert_if_new( &mut self, @@ -829,9 +908,14 @@ impl PeerTableServer { async fn handle_get_random_peer( &mut self, msg: peer_table_server_protocol::GetRandomPeer, - _ctx: &Context, - ) -> Option<(H256, PeerConnection)> { - self.do_get_random_peer(msg.capabilities) + ctx: &Context, + ) -> Option<(H256, PeerConnection, RequestPermit)> { + let (peer_id, conn) = self.do_get_random_peer(msg.capabilities)?; + self.peers + .get_mut(&peer_id) + .expect("peer returned by do_get_random_peer must be present in self.peers") + .requests += 1; + Some((peer_id, conn, RequestPermit::new(ctx.actor_ref(), peer_id))) } #[request_handler] @@ -922,6 +1006,41 @@ impl PeerTableServer { .map(|(k, _, _, v)| (k, v)) } + /// Returns up to `n` best peers with capability overlap, sorted by weight + /// descending. Excludes peers at capacity. Does NOT mutate state — caller + /// is responsible for incrementing `requests` on each returned peer. The + /// sort uses a pre-increment snapshot: later picks don't see earlier + /// picks' bumps, which is fine for small `n`. + fn do_get_best_n_peers( + &self, + capabilities: &[Capability], + n: usize, + ) -> Vec<(H256, PeerConnection)> { + let mut candidates: Vec<(H256, i64, i64, PeerConnection)> = self + .peers + .iter() + .filter_map(|(id, peer_data)| { + if !self.can_try_more_requests(&peer_data.score, &peer_data.requests) + || !capabilities + .iter() + .any(|cap| peer_data.supported_capabilities.contains(cap)) + { + None + } else { + let connection = peer_data.connection.clone()?; + Some((*id, peer_data.score, peer_data.requests, connection)) + } + }) + .collect(); + + candidates.sort_by_key(|(_, score, reqs, _)| -self.weight_peer(score, reqs)); + candidates + .into_iter() + .take(n) + .map(|(id, _, _, conn)| (id, conn)) + .collect() + } + fn prune(&mut self) { let disposable_contacts = self .contacts @@ -1170,27 +1289,6 @@ impl PeerTableServer { .count() } - fn do_get_peer_connections( - &self, - capabilities: Vec, - ) -> Vec<(H256, PeerConnection)> { - self.peers - .iter() - .filter_map(|(peer_id, peer_data)| { - if !capabilities - .iter() - .any(|cap| peer_data.supported_capabilities.contains(cap)) - { - return None; - } - peer_data - .connection - .clone() - .map(|connection| (*peer_id, connection)) - }) - .collect() - } - fn do_get_random_peer(&self, capabilities: Vec) -> Option<(H256, PeerConnection)> { let peers: Vec<(H256, &PeerConnection, i64)> = self .peers diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 1106c6c101e..9fede93a267 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -6,7 +6,7 @@ use crate::rlpx::message::Message as RLPxMessage; use crate::{ metrics::{CurrentStepValue, METRICS}, peer_handler::PeerHandler, - peer_table::{PeerTable, PeerTableServerProtocol as _}, + peer_table::{PeerTableServerProtocol as _, RequestPermit}, rlpx::{ connection::server::PeerConnection, error::PeerConnectionError, @@ -62,7 +62,6 @@ pub struct RequestStorageTrieNodesError { pub source: SnapError, } -#[derive(Clone)] struct StorageTaskResult { start_index: usize, account_storages: Vec>, @@ -194,8 +193,6 @@ pub async fn request_account_range( } if let Ok((accounts, peer_id, chunk_start_end)) = task_receiver.try_recv() { - // Release the reservation we made before spawning the task. - peers.peer_table.dec_requests(peer_id)?; if let Some((chunk_start, chunk_end)) = chunk_start_end { if chunk_start <= chunk_end { tasks_queue_not_started.push_back((chunk_start, chunk_end)); @@ -223,7 +220,7 @@ pub async fn request_account_range( all_accounts_state.extend(accounts.iter().map(|unit| unit.account)); } - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -264,12 +261,6 @@ pub async fn request_account_range( .expect("Should be able to update pivot") } - // Reserve a request slot before spawning so get_best_peer sees - // this peer as busy immediately, preventing spawn floods. - // Workers call outgoing_request directly (not make_request) to - // avoid a double increment. Released via dec_requests on try_recv. - peers.peer_table.inc_requests(peer_id)?; - tokio::spawn(request_account_range_worker( peer_id, connection, @@ -277,6 +268,7 @@ pub async fn request_account_range( chunk_end, pivot_header.state_root, tx, + permit, )); } @@ -397,8 +389,6 @@ pub async fn request_bytecodes( remaining_start, remaining_end, } = result; - // Release the reservation we made before spawning the task. - peers.peer_table.dec_requests(peer_id)?; debug!( "Downloaded {} bytecodes from peer {peer_id} (current count: {downloaded_count})", @@ -423,7 +413,7 @@ pub async fn request_bytecodes( } } - let Some((peer_id, mut connection)) = peers + let Some((peer_id, mut connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -458,17 +448,7 @@ pub async fn request_bytecodes( .copied() .collect(); - // Reserve a request slot before spawning (see account range comment). - peers.peer_table.inc_requests(peer_id)?; - tokio::spawn(async move { - let empty_task_result = TaskResult { - start_index: chunk_start, - bytecodes: vec![], - peer_id, - remaining_start: chunk_start, - remaining_end: chunk_end, - }; debug!( "Requesting bytecode from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}" ); @@ -478,37 +458,41 @@ pub async fn request_bytecodes( hashes: hashes_to_request.clone(), bytes: MAX_RESPONSE_BYTES, }); - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - if let Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) = connection + + let response = connection .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await - { - if codes.is_empty() { - tx.send(empty_task_result).await.ok(); - // Too spammy - // tracing::error!("Received empty account range"); - return; + .await; + drop(permit); + + let (bytecodes, remaining_start) = match response { + Ok(RLPxMessage::ByteCodes(ByteCodes { id: _, codes })) if !codes.is_empty() => { + let validated_codes: Vec = codes + .into_iter() + .zip(hashes_to_request) + .take_while(|(b, hash)| ethrex_common::utils::keccak(b) == *hash) + .map(|(b, _hash)| b) + .collect(); + let new_remaining_start = chunk_start + validated_codes.len(); + (validated_codes, new_remaining_start) } - // Validate response by hashing bytecodes - let validated_codes: Vec = codes - .into_iter() - .zip(hashes_to_request) - .take_while(|(b, hash)| ethrex_common::utils::keccak(b) == *hash) - .map(|(b, _hash)| b) - .collect(); - let result = TaskResult { - start_index: chunk_start, - remaining_start: chunk_start + validated_codes.len(), - bytecodes: validated_codes, - peer_id, - remaining_end: chunk_end, - }; - tx.send(result).await.ok(); - } else { - tracing::debug!("Failed to get bytecode"); - tx.send(empty_task_result).await.ok(); - } + Ok(RLPxMessage::ByteCodes(_)) => { + // Empty response; retry the full chunk. + (Vec::new(), chunk_start) + } + _ => { + tracing::debug!("Failed to get bytecode"); + (Vec::new(), chunk_start) + } + }; + + let result = TaskResult { + start_index: chunk_start, + bytecodes, + peer_id, + remaining_start, + remaining_end: chunk_end, + }; + tx.send(result).await.ok(); }); } @@ -660,8 +644,6 @@ pub async fn request_storage_ranges( remaining_end, remaining_hash_range: (hash_start, hash_end), } = result; - // Release the reservation we made before spawning the task. - peers.peer_table.dec_requests(peer_id)?; completed_tasks += 1; for (_, accounts) in accounts_by_root_hash[start_index..remaining_start].iter() { @@ -958,7 +940,7 @@ pub async fn request_storage_ranges( break; } - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -999,9 +981,6 @@ pub async fn request_storage_ranges( chunk_storage_roots.first().unwrap_or(&H256::zero()), ); } - // Reserve a request slot before spawning (see account range comment). - peers.peer_table.inc_requests(peer_id)?; - tokio::spawn(request_storage_ranges_worker( task, peer_id, @@ -1010,6 +989,7 @@ pub async fn request_storage_ranges( chunk_account_hashes, chunk_storage_roots, tx, + permit, )); } @@ -1055,16 +1035,19 @@ pub async fn request_storage_ranges( Ok(chunk_index + 1) } +/// Requests state trie nodes at the given paths from an already-selected peer. +/// Releases the peer slot as soon as the wire response is in; hash +/// verification below is pure computation. +/// Returns `SnapError::InvalidHash` if any returned node's hash does not match +/// the requested path, and `SnapError::InvalidData` on an empty or oversized +/// response. pub async fn request_state_trienodes( - peer_id: H256, mut connection: PeerConnection, - peer_table: PeerTable, + permit: RequestPermit, state_root: H256, paths: Vec, ) -> Result, SnapError> { let expected_nodes = paths.len(); - // Keep track of peers we requested from so we can penalize unresponsive peers when we get a response - // This is so we avoid penalizing peers due to requesting stale data let request_id = rand::random(); let request = RLPxMessage::GetTrieNodes(GetTrieNodes { @@ -1077,15 +1060,11 @@ pub async fn request_state_trienodes( .collect(), bytes: MAX_RESPONSE_BYTES, }); - let nodes = match PeerHandler::make_request( - &peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await - { + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); + let nodes = match response { Ok(RLPxMessage::TrieNodes(trie_nodes)) => trie_nodes .nodes .iter() @@ -1115,29 +1094,23 @@ pub async fn request_state_trienodes( Ok(nodes) } -/// Requests storage trie nodes given the root of the state trie where they are contained and -/// a hashmap mapping the path to the account in the state trie (aka hashed address) to the paths to the nodes in its storage trie (can be full or partial) -/// Returns the nodes or None if: -/// - There are no available peers (the node just started up or was rejected by all other nodes) -/// - No peer returned a valid response in the given time and retry limits +/// Requests storage trie nodes from an already-selected peer. The `GetTrieNodes` +/// payload carries the state root and the per-account paths (hashed address +/// prefix followed by storage-trie paths, which may be full or partial). +/// Consumes a `RequestPermit` reserved by the caller at peer selection time; +/// the permit drops when this function returns, releasing the slot. +/// Errors are returned as `RequestStorageTrieNodesError` carrying the +/// request ID so the caller can reconcile it with its in-flight map. pub async fn request_storage_trienodes( - peer_id: H256, mut connection: PeerConnection, - peer_table: PeerTable, + _permit: RequestPermit, get_trie_nodes: GetTrieNodes, ) -> Result { - // Keep track of peers we requested from so we can penalize unresponsive peers when we get a response - // This is so we avoid penalizing peers due to requesting stale data let request_id = get_trie_nodes.id; let request = RLPxMessage::GetTrieNodes(get_trie_nodes); - match PeerHandler::make_request( - &peer_table, - peer_id, - &mut connection, - request, - PEER_REPLY_TIMEOUT, - ) - .await + match connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await { Ok(RLPxMessage::TrieNodes(trie_nodes)) => Ok(trie_nodes), Ok(other_msg) => Err(RequestStorageTrieNodesError { @@ -1162,6 +1135,7 @@ async fn request_account_range_worker( chunk_end: H256, state_root: H256, tx: tokio::sync::mpsc::Sender<(Vec, H256, Option<(H256, H256)>)>, + permit: RequestPermit, ) -> Result<(), SnapError> { debug!("Requesting account range from peer {peer_id}, chunk: {chunk_start:?} - {chunk_end:?}"); let request_id = rand::random(); @@ -1172,82 +1146,78 @@ async fn request_account_range_worker( limit_hash: chunk_end, response_bytes: MAX_RESPONSE_BYTES, }); - if let Ok(RLPxMessage::AccountRange(AccountRange { + + // Perform the wire request and release the peer slot as soon as the + // response (or error) is in — processing below is pure computation. + let response = connection + .outgoing_request(request, PEER_REPLY_TIMEOUT) + .await; + drop(permit); + + let retry = || { + ( + Vec::::new(), + Some((chunk_start, chunk_end)), + ) + }; + let (accounts_out, chunk_left) = if let Ok(RLPxMessage::AccountRange(AccountRange { id: _, accounts, proof, - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - })) = connection - .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await + })) = response { if accounts.is_empty() { - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - return Ok(()); - } - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let (account_hashes, account_states): (Vec<_>, Vec<_>) = accounts - .clone() - .into_iter() - .map(|unit| (unit.hash, unit.account)) - .unzip(); - let encoded_accounts = account_states - .iter() - .map(|acc| acc.encode_to_vec()) - .collect::>(); - - let Ok(should_continue) = verify_range( - state_root, - &chunk_start, - &account_hashes, - &encoded_accounts, - &proof, - ) else { - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - tracing::error!("Received invalid account range"); - return Ok(()); - }; - - // If the range has more accounts to fetch, we send the new chunk - let chunk_left = if should_continue { - let last_hash = match account_hashes.last() { - Some(last_hash) => last_hash, - None => { - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - error!("Account hashes last failed, this shouldn't happen"); - return Err(SnapError::NoAccountHashes); - } - }; - let new_start_u256 = U256::from_big_endian(&last_hash.0) + 1; - let new_start = H256::from_uint(&new_start_u256); - Some((new_start, chunk_end)) + retry() } else { - None - }; - tx.send(( - accounts - .into_iter() - .filter(|unit| unit.hash <= chunk_end) - .collect(), - peer_id, - chunk_left, - )) - .await - .ok(); + // Validate response — build the verification inputs by borrowing + // `accounts` so we can still consume it for the filtered output. + let proof = encodable_to_proof(&proof); + let account_hashes: Vec = accounts.iter().map(|u| u.hash).collect(); + let encoded_accounts: Vec<_> = + accounts.iter().map(|u| u.account.encode_to_vec()).collect(); + + match verify_range( + state_root, + &chunk_start, + &account_hashes, + &encoded_accounts, + &proof, + ) { + Ok(should_continue) => { + let chunk_left = if should_continue { + match account_hashes.last() { + Some(last_hash) => { + let new_start_u256 = U256::from_big_endian(&last_hash.0) + 1; + let new_start = H256::from_uint(&new_start_u256); + Some((new_start, chunk_end)) + } + None => { + // Unreachable: accounts is non-empty here. + error!("Account hashes last failed, this shouldn't happen"); + Some((chunk_start, chunk_end)) + } + } + } else { + None + }; + let filtered = accounts + .into_iter() + .filter(|unit| unit.hash <= chunk_end) + .collect::>(); + (filtered, chunk_left) + } + Err(_) => { + tracing::error!("Received invalid account range"); + retry() + } + } + } } else { tracing::debug!("Failed to get account range"); - tx.send((Vec::new(), peer_id, Some((chunk_start, chunk_end)))) - .await - .ok(); - } + retry() + }; + + tx.send((accounts_out, peer_id, chunk_left)).await.ok(); Ok::<(), SnapError>(()) } @@ -1260,19 +1230,23 @@ async fn request_storage_ranges_worker( chunk_account_hashes: Vec, chunk_storage_roots: Vec, tx: tokio::sync::mpsc::Sender, + permit: RequestPermit, ) -> Result<(), SnapError> { let start = task.start_index; let end = task.end_index; let start_hash = task.start_hash; - let empty_task_result = StorageTaskResult { - start_index: task.start_index, - account_storages: Vec::new(), - peer_id, - remaining_start: task.start_index, - remaining_end: task.end_index, - remaining_hash_range: (start_hash, task.end_hash), + // Defaults for the "retry this same range" outcome used by every failure + // branch below. + let retry_outcome = || { + ( + Vec::>::new(), + task.start_index, + task.end_index, + (start_hash, task.end_hash), + ) }; + let request_id = rand::random(); let request = RLPxMessage::GetStorageRanges(GetStorageRanges { id: request_id, @@ -1283,134 +1257,142 @@ async fn request_storage_ranges_worker( response_bytes: MAX_RESPONSE_BYTES, }); tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", "Sending storage range request"); - let Ok(RLPxMessage::StorageRanges(StorageRanges { - id: _, - slots, - proof, - // The caller already holds a request reservation for this peer, - // so call outgoing_request directly to avoid a double increment. - })) = connection + + // Perform the wire request and release the peer slot as soon as the + // response (or error) is in — validation below is pure computation. + let response = connection .outgoing_request(request, PEER_REPLY_TIMEOUT) - .await - else { - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout"); - tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed"); - tracing::debug!("Failed to get storage range"); - tx.send(empty_task_result).await.ok(); - return Ok(()); - }; - if slots.is_empty() && proof.is_empty() { - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty"); - tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty"); - tx.send(empty_task_result).await.ok(); - tracing::debug!("Received empty storage range"); - return Ok(()); - } - // Check we got some data and no more than the requested amount - if slots.len() > chunk_storage_roots.len() || slots.is_empty() { - tx.send(empty_task_result).await.ok(); - return Ok(()); - } - // Unzip & validate response - let proof = encodable_to_proof(&proof); - let mut account_storages: Vec> = vec![]; - let mut should_continue = false; - // Validate each storage range - let mut storage_roots = chunk_storage_roots.into_iter(); - let last_slot_index = slots.len() - 1; - for (i, next_account_slots) in slots.into_iter().enumerate() { - // We won't accept empty storage ranges - if next_account_slots.is_empty() { - // This shouldn't happen - error!("Received empty storage range, skipping"); - tx.send(empty_task_result.clone()).await.ok(); - return Ok(()); + .await; + drop(permit); + + let (account_storages, remaining_start, remaining_end, remaining_hash_range) = 'outcome: { + let Ok(RLPxMessage::StorageRanges(StorageRanges { + id: _, + slots, + proof, + })) = response + else { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("timeout"); + tracing::trace!(peer_id = %peer_id, msg_type = "GetStorageRanges", outcome = "timeout", "Storage range request failed"); + tracing::debug!("Failed to get storage range"); + break 'outcome retry_outcome(); + }; + if slots.is_empty() && proof.is_empty() { + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("empty"); + tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "empty", "Storage range response empty"); + tracing::debug!("Received empty storage range"); + break 'outcome retry_outcome(); } - let encoded_values = next_account_slots - .iter() - .map(|slot| slot.data.encode_to_vec()) - .collect::>(); - let hashed_keys: Vec<_> = next_account_slots.iter().map(|slot| slot.hash).collect(); - - let storage_root = match storage_roots.next() { - Some(root) => root, - None => { - tx.send(empty_task_result.clone()).await.ok(); - error!("No storage root for account {i}"); - return Err(SnapError::NoStorageRoots); + if slots.len() > chunk_storage_roots.len() || slots.is_empty() { + break 'outcome retry_outcome(); + } + let proof = encodable_to_proof(&proof); + let mut account_storages: Vec> = vec![]; + let mut should_continue = false; + let mut validation_failed = false; + let mut storage_roots = chunk_storage_roots.into_iter(); + let last_slot_index = slots.len() - 1; + for (i, next_account_slots) in slots.into_iter().enumerate() { + if next_account_slots.is_empty() { + error!("Received empty storage range, skipping"); + validation_failed = true; + break; } - }; + let encoded_values = next_account_slots + .iter() + .map(|slot| slot.data.encode_to_vec()) + .collect::>(); + let hashed_keys: Vec<_> = next_account_slots.iter().map(|slot| slot.hash).collect(); + + let storage_root = match storage_roots.next() { + Some(root) => root, + None => { + error!("No storage root for account {i}"); + break 'outcome retry_outcome(); + } + }; - // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs - if i == last_slot_index && !proof.is_empty() { - let Ok(sc) = verify_range( + // The proof corresponds to the last slot, for the previous ones the slot must be the full range without edge proofs + if i == last_slot_index && !proof.is_empty() { + let Ok(sc) = verify_range( + storage_root, + &start_hash, + &hashed_keys, + &encoded_values, + &proof, + ) else { + validation_failed = true; + break; + }; + should_continue = sc; + } else if verify_range( storage_root, &start_hash, &hashed_keys, &encoded_values, - &proof, - ) else { - tx.send(empty_task_result).await.ok(); - return Ok(()); - }; - should_continue = sc; - } else if verify_range( - storage_root, - &start_hash, - &hashed_keys, - &encoded_values, - &[], - ) - .is_err() - { - tx.send(empty_task_result.clone()).await.ok(); - return Ok(()); + &[], + ) + .is_err() + { + validation_failed = true; + break; + } + + account_storages.push( + next_account_slots + .iter() + .map(|slot| (slot.hash, slot.data)) + .collect(), + ); } - account_storages.push( - next_account_slots - .iter() - .map(|slot| (slot.hash, slot.data)) - .collect(), - ); - } - let (remaining_start, remaining_end, remaining_start_hash) = if should_continue { - let last_account_storage = match account_storages.last() { - Some(storage) => storage, - None => { - tx.send(empty_task_result.clone()).await.ok(); - error!("No account storage found, this shouldn't happen"); - return Err(SnapError::NoAccountStorages); - } - }; - let (last_hash, _) = match last_account_storage.last() { - Some(last_hash) => last_hash, - None => { - tx.send(empty_task_result.clone()).await.ok(); - error!("No last hash found, this shouldn't happen"); - return Err(SnapError::NoAccountStorages); - } + if validation_failed { + break 'outcome retry_outcome(); + } + + let (remaining_start, remaining_end, remaining_start_hash) = if should_continue { + let last_account_storage = match account_storages.last() { + Some(storage) => storage, + None => { + error!("No account storage found, this shouldn't happen"); + break 'outcome retry_outcome(); + } + }; + let (last_hash, _) = match last_account_storage.last() { + Some(last_hash) => last_hash, + None => { + error!("No last hash found, this shouldn't happen"); + break 'outcome retry_outcome(); + } + }; + let next_hash_u256 = U256::from_big_endian(&last_hash.0).saturating_add(1.into()); + let next_hash = H256::from_uint(&next_hash_u256); + (start + account_storages.len() - 1, end, next_hash) + } else { + (start + account_storages.len(), end, H256::zero()) }; - let next_hash_u256 = U256::from_big_endian(&last_hash.0).saturating_add(1.into()); - let next_hash = H256::from_uint(&next_hash_u256); - (start + account_storages.len() - 1, end, next_hash) - } else { - (start + account_storages.len(), end, H256::zero()) + let slot_count: usize = account_storages.iter().map(|s| s.len()).sum(); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success"); + tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received"); + ( + account_storages, + remaining_start, + remaining_end, + (remaining_start_hash, task.end_hash), + ) }; - let slot_count: usize = account_storages.iter().map(|s| s.len()).sum(); + let task_result = StorageTaskResult { start_index: start, account_storages, peer_id, remaining_start, remaining_end, - remaining_hash_range: (remaining_start_hash, task.end_hash), + remaining_hash_range, }; - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_storage_request("success"); - tracing::trace!(peer_id = %peer_id, msg_type = "StorageRanges", outcome = "success", slots = slot_count, "Storage range response received"); tx.send(task_result).await.ok(); Ok::<(), SnapError>(()) } diff --git a/crates/networking/p2p/sync/healing/state.rs b/crates/networking/p2p/sync/healing/state.rs index 54c46658a8b..e13a74f9ee1 100644 --- a/crates/networking/p2p/sync/healing/state.rs +++ b/crates/networking/p2p/sync/healing/state.rs @@ -216,7 +216,7 @@ async fn heal_state_trie( .unwrap_or_default(), longest_path_seen, ); - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -243,17 +243,11 @@ async fn heal_state_trie( let tx = task_sender.clone(); inflight_tasks += 1; - let peer_table = peers.peer_table.clone(); tokio::spawn(async move { // TODO: check errors to determine whether the current block is stale - let response = request_state_trienodes( - peer_id, - connection, - peer_table, - state_root, - batch.clone(), - ) - .await; + let response = + request_state_trienodes(connection, permit, state_root, batch.clone()) + .await; // TODO: add error handling tx.send((peer_id, response, batch)).await.inspect_err( |err| debug!(error=?err, "Failed to send state trie nodes response"), diff --git a/crates/networking/p2p/sync/healing/storage.rs b/crates/networking/p2p/sync/healing/storage.rs index c593b3756ba..ab39e9b8b07 100644 --- a/crates/networking/p2p/sync/healing/storage.rs +++ b/crates/networking/p2p/sync/healing/storage.rs @@ -331,7 +331,7 @@ async fn ask_peers_for_nodes( logged_no_free_peers_count: &mut u32, ) { if (requests.len() as u32) < MAX_IN_FLIGHT_REQUESTS && !download_queue.is_empty() { - let Some((peer_id, connection)) = peers + let Some((peer_id, connection, permit)) = peers .peer_table .get_best_peer(SUPPORTED_SNAP_CAPABILITIES.to_vec()) .await @@ -368,11 +368,9 @@ async fn ask_peers_for_nodes( let tx = task_sender.clone(); - let peer_table = peers.peer_table.clone(); - requests_task_joinset.spawn(async move { let req_id = gtn.id; - let response = request_storage_trienodes(peer_id, connection, peer_table, gtn).await; + let response = request_storage_trienodes(connection, permit, gtn).await; // TODO: add error handling tx.try_send(response).inspect_err( |err| debug!(error=?err, "Failed to send state trie nodes response"), diff --git a/crates/networking/p2p/sync/snap_sync.rs b/crates/networking/p2p/sync/snap_sync.rs index 37a864a0ef9..b612e598959 100644 --- a/crates/networking/p2p/sync/snap_sync.rs +++ b/crates/networking/p2p/sync/snap_sync.rs @@ -686,10 +686,9 @@ pub async fn update_pivot( block_sync_state: &mut SnapBlockSyncState, diagnostics: &Arc>, ) -> Result { - const MAX_RETRIES_PER_PEER: u64 = 3; /// Maximum number of full peer rotations before giving up. With rotation, /// each pass tries every eligible peer once; the budget scales naturally - /// with network size. + /// with network size. Between rotations we back off exponentially. const MAX_ROTATIONS: u64 = 5; const INITIAL_RETRY_DELAY: Duration = Duration::from_secs(1); const MAX_RETRY_DELAY: Duration = Duration::from_secs(30); @@ -739,19 +738,18 @@ pub async fn update_pivot( tokio::time::sleep(delay).await; } - let Some((peer_id, mut connection)) = peers + // One permit per attempt: consumed by `get_block_header` below. + let Some((peer_id, mut connection, permit)) = peers .peer_table .get_best_peer_excluding(SUPPORTED_ETH_CAPABILITIES.to_vec(), excluded_peers.clone()) .await? else { // Distinguish "rotation exhausted" from "no peers currently eligible - // (all at capacity)". Check if any peer is eligible ignoring - // exclusions — if so, we're waiting on capacity, not rotation. + // (all at capacity)". Read-only probe — does not bump `requests`. let any_eligible = peers .peer_table - .get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) - .await? - .is_some(); + .has_eligible_peer(SUPPORTED_ETH_CAPABILITIES.to_vec()) + .await?; if !any_eligible { debug!("update_pivot: no eligible peers available, waiting"); @@ -791,82 +789,69 @@ pub async fn update_pivot( "Trying to update pivot to {new_pivot_block_number} with peer {peer_id} (score: {peer_score})" ); - // Try up to MAX_RETRIES_PER_PEER times with this specific peer. - // Both Ok(None) and recoverable errors count as a failure and advance - // through retries; on exhaustion, the peer is excluded and we rotate. - let mut peer_failures: u64 = 0; - for attempt in 0..MAX_RETRIES_PER_PEER { - let outcome = peers - .get_block_header(peer_id, &mut connection, new_pivot_block_number) - .await; - - match outcome { - Ok(Some(pivot)) => { - // Success — reward peer and return - peers.peer_table.record_success(peer_id)?; - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success"); - info!("Successfully updated pivot"); - - { - let mut diag = diagnostics.write().await; - diag.push_pivot_change(super::PivotChangeEvent { - timestamp: current_unix_time(), - old_pivot_number: block_number, - new_pivot_number: pivot.number, - outcome: "success".to_string(), - failure_reason: None, - }); - diag.pivot_block_number = Some(pivot.number); - diag.pivot_timestamp = Some(pivot.timestamp); - let pivot_age = current_unix_time().saturating_sub(pivot.timestamp); - diag.pivot_age_seconds = Some(pivot_age); - METRICS - .pivot_timestamp - .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed); - } - let block_headers = peers - .request_block_headers(block_number + 1, pivot.hash()) - .await? - .ok_or(SyncError::NoBlockHeaders)?; - block_sync_state - .process_incoming_headers(block_headers.into_iter()) - .await?; - *METRICS.sync_head_hash.lock().await = pivot.hash(); - return Ok(pivot); - } - Ok(None) => { - peers.peer_table.record_failure(peer_id)?; - peer_failures += 1; - let peer_score = peers.peer_table.get_score(peer_id).await?; - warn!( - "update_pivot: peer {peer_id} returned None (attempt {}/{MAX_RETRIES_PER_PEER}, score: {peer_score})", - attempt + 1 - ); - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none"); - } - Err(e) if e.is_recoverable() => { - peers.peer_table.record_failure(peer_id)?; - peer_failures += 1; - warn!( - "update_pivot: peer {peer_id} failed with {e} (attempt {}/{MAX_RETRIES_PER_PEER})", - attempt + 1 - ); - #[cfg(feature = "metrics")] - ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error"); - } - Err(e) => { - // Non-recoverable error (e.g., dead peer table actor, - // storage full) — surface it. - return Err(SyncError::PeerHandler(e)); + // One attempt per peer per rotation. A peer that fails is excluded for + // this rotation and will be retried (with backoff) in the next one. + let outcome = peers + .get_block_header(&mut connection, permit, new_pivot_block_number) + .await; + + match outcome { + Ok(Some(pivot)) => { + peers.peer_table.record_success(peer_id)?; + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("success"); + info!("Successfully updated pivot"); + + { + let mut diag = diagnostics.write().await; + diag.push_pivot_change(super::PivotChangeEvent { + timestamp: current_unix_time(), + old_pivot_number: block_number, + new_pivot_number: pivot.number, + outcome: "success".to_string(), + failure_reason: None, + }); + diag.pivot_block_number = Some(pivot.number); + diag.pivot_timestamp = Some(pivot.timestamp); + let pivot_age = current_unix_time().saturating_sub(pivot.timestamp); + diag.pivot_age_seconds = Some(pivot_age); + METRICS + .pivot_timestamp + .store(pivot.timestamp, std::sync::atomic::Ordering::Relaxed); } + let block_headers = peers + .request_block_headers(block_number + 1, pivot.hash()) + .await? + .ok_or(SyncError::NoBlockHeaders)?; + block_sync_state + .process_incoming_headers(block_headers.into_iter()) + .await?; + *METRICS.sync_head_hash.lock().await = pivot.hash(); + return Ok(pivot); + } + Ok(None) => { + peers.peer_table.record_failure(peer_id)?; + let peer_score = peers.peer_table.get_score(peer_id).await?; + warn!( + "update_pivot: peer {peer_id} returned None (score: {peer_score}), excluding for this rotation" + ); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_none"); + excluded_peers.push(peer_id); + } + Err(e) if e.is_recoverable() => { + peers.peer_table.record_failure(peer_id)?; + warn!("update_pivot: peer {peer_id} failed with {e}, excluding for this rotation"); + #[cfg(feature = "metrics")] + ethrex_metrics::sync::METRICS_SYNC.inc_pivot_update("peer_error"); + excluded_peers.push(peer_id); + } + Err(e) => { + // Non-recoverable error (e.g., dead peer table actor, + // storage full) — surface it. + return Err(SyncError::PeerHandler(e)); } } - - // Peer exhausted its retries — exclude it and try the next one - debug!("update_pivot: excluding peer {peer_id} after {peer_failures} failures"); - excluded_peers.push(peer_id); } }