Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 46 additions & 84 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::rlpx::initiator::RLPxInitiator;
use crate::{
metrics::{CurrentStepValue, METRICS},
peer_table::{PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _},
peer_table::{
PeerData, PeerDiagnostics, PeerTable, PeerTableServerProtocol as _, RequestPermit,
},
rlpx::{
connection::server::PeerConnection,
error::PeerConnectionError,
Expand Down Expand Up @@ -48,10 +50,12 @@ pub enum BlockRequestOrder {
NewToOld,
}

/// Asks a single already-selected peer for the block number at `sync_head`.
/// Consumes a `RequestPermit`; the permit drops on return, releasing the slot.
async fn ask_peer_head_number(
peer_id: H256,
connection: &mut PeerConnection,
peer_table: &PeerTable,
_permit: RequestPermit,
sync_head: H256,
retries: i32,
) -> Result<u64, PeerHandlerError> {
Expand All @@ -68,7 +72,8 @@ async fn ask_peer_head_number(

debug!("(Retry {retries}) Requesting sync head {sync_head:?} to peer {peer_id}");

match PeerHandler::make_request(peer_table, peer_id, connection, request, PEER_REPLY_TIMEOUT)
match connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await
{
Ok(RLPxMessage::BlockHeaders(BlockHeaders {
Expand Down Expand Up @@ -104,27 +109,12 @@ impl PeerHandler {
}
}

pub(crate) async fn make_request(
// TODO: We should receive the PeerHandler (or self) instead, but since it is not yet spawnified it cannot be shared
// Fix this to avoid passing the PeerTable as a parameter
peer_table: &PeerTable,
peer_id: H256,
connection: &mut PeerConnection,
message: RLPxMessage,
timeout: Duration,
) -> Result<RLPxMessage, PeerConnectionError> {
peer_table.inc_requests(peer_id)?;
let result = connection.outgoing_request(message, timeout).await;
peer_table.dec_requests(peer_id)?;
result
}

/// Returns a random node id and the channel ends to an active peer connection that supports the given capability
/// It doesn't guarantee that the selected peer is not currently busy
async fn get_random_peer(
&mut self,
capabilities: &[Capability],
) -> Result<Option<(H256, PeerConnection)>, PeerHandlerError> {
) -> Result<Option<(H256, PeerConnection, RequestPermit)>, PeerHandlerError> {
Ok(self
.peer_table
.get_random_peer(capabilities.to_vec())
Expand Down Expand Up @@ -165,30 +155,20 @@ impl PeerHandler {
// sync_head is unknown to our peers
return Ok(None);
}
let peer_connection = self
let peers = self
.peer_table
.get_peer_connections(SUPPORTED_ETH_CAPABILITIES.to_vec())
.get_best_n_peers(SUPPORTED_ETH_CAPABILITIES.to_vec(), MAX_PEERS_TO_ASK)
.await?;

let selected_peers: Vec<_> = peer_connection
.iter()
.take(MAX_PEERS_TO_ASK)
.map(|(id, _)| *id)
.collect();
let selected_peers: Vec<_> = peers.iter().map(|(id, _, _)| *id).collect();
debug!(
retry = retries,
peers_selected = ?selected_peers,
"request_block_headers: resolving sync head with peers"
);
for (peer_id, mut connection) in peer_connection.into_iter().take(MAX_PEERS_TO_ASK) {
match ask_peer_head_number(
peer_id,
&mut connection,
&self.peer_table,
sync_head,
retries,
)
.await
for (peer_id, mut connection, permit) in peers {
match ask_peer_head_number(peer_id, &mut connection, permit, sync_head, retries)
.await
{
Ok(number) => {
sync_head_number = number;
Expand Down Expand Up @@ -266,9 +246,6 @@ impl PeerHandler {
if let Ok((headers, peer_id, _connection, startblock, previous_chunk_limit)) =
task_receiver.try_recv()
{
// Release the reservation we made before spawning the task.
self.peer_table.dec_requests(peer_id)?;

trace!("We received a download chunk from peer");
if headers.is_empty() {
self.peer_table.record_failure(peer_id)?;
Expand Down Expand Up @@ -316,7 +293,7 @@ impl PeerHandler {
self.peer_table.record_success(peer_id)?;
debug!("Downloader {peer_id} freed");
}
let Some((peer_id, mut connection)) = self
let Some((peer_id, mut connection, permit)) = self
.peer_table
.get_best_peer(SUPPORTED_ETH_CAPABILITIES.to_vec())
.await?
Expand Down Expand Up @@ -344,33 +321,23 @@ impl PeerHandler {
current_show += 1;
}

// Queue drained but in-flight tasks haven't returned yet.
// Drop the permit we just acquired (end of scope) and yield
// so the result receive path gets a chance to run.
tokio::task::yield_now().await;
continue;
};
let tx = task_sender.clone();
// Reserve a request slot before spawning so get_best_peer sees
// this peer as busy immediately, preventing the loop from
// spawning dozens of tasks for the same peer in a single tick.
// Reserve a request slot before spawning so get_best_peer sees
// this peer as busy immediately, preventing the loop from
// spawning dozens of tasks for the same peer in a single tick.
// The reservation is released in the completion handler
// (dec_requests on try_recv). The worker calls
// outgoing_request directly (not make_request) since we
// already hold the reservation.
self.peer_table.inc_requests(peer_id)?;
debug!("Downloader {peer_id} is now busy");

// Run download_chunk_from_peer in a different Tokio task.
// The worker must always send a result so dec_requests fires
// in the completion handler. The unwrap_or_default() ensures
// download errors don't panic.
tokio::spawn(async move {
trace!(
"Sync Log 5: Requesting block headers from peer {peer_id}, chunk_limit: {chunk_limit}"
);
let headers = Self::download_chunk_from_peer(
peer_id,
&mut connection,
permit,
startblock,
chunk_limit,
)
Expand Down Expand Up @@ -443,18 +410,16 @@ impl PeerHandler {
});
match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
None => Ok(None),
Some((peer_id, mut connection)) => {
Some((peer_id, mut connection, permit)) => {
// Release the peer's slot as soon as the wire response is in.
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
if let Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
block_headers,
})) = PeerHandler::make_request(
&self.peer_table,
peer_id,
&mut connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
})) = response
{
if block_headers.is_empty() {
// Empty response is valid per eth spec (peer may not have these blocks)
Expand Down Expand Up @@ -485,12 +450,12 @@ impl PeerHandler {
}

/// Given a peer id, a chunk start and a chunk limit, requests the block headers from the peer.
/// The caller must already hold a request reservation for this peer
/// (via `inc_requests` before spawning), so we call `outgoing_request`
/// directly instead of `make_request` to avoid a double increment.
/// Consumes a `RequestPermit` that was reserved at peer selection time;
/// the permit drops when this function returns, releasing the peer's slot.
async fn download_chunk_from_peer(
peer_id: H256,
connection: &mut PeerConnection,
_permit: RequestPermit,
startblock: u64,
chunk_limit: u64,
) -> Result<Vec<BlockHeader>, PeerHandlerError> {
Expand Down Expand Up @@ -537,18 +502,16 @@ impl PeerHandler {
});
match self.get_random_peer(&SUPPORTED_ETH_CAPABILITIES).await? {
None => Ok(None),
Some((peer_id, mut connection)) => {
Some((peer_id, mut connection, permit)) => {
// Release the peer's slot as soon as the wire response is in.
let response = connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await;
drop(permit);
if let Ok(RLPxMessage::BlockBodies(BlockBodies {
id: _,
block_bodies,
})) = PeerHandler::make_request(
&self.peer_table,
peer_id,
&mut connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
})) = response
{
// Check that the response is not empty and does not contain more bodies than the ones requested
if !block_bodies.is_empty() && block_bodies.len() <= block_hashes_len {
Expand Down Expand Up @@ -623,10 +586,14 @@ impl PeerHandler {
Ok(self.peer_table.peer_count().await?)
}

/// Requests a single block header by number from an already-selected peer.
/// Consumes a `RequestPermit` reserved by the caller at peer selection
/// time; the permit drops when this function returns, releasing the slot.
Comment on lines 586 to +591
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unused _peer_id parameter in public API

_peer_id: H256 is accepted by get_block_header but never used inside the function — it was only consumed by the removed make_request call. All call sites still pass peer_id explicitly (e.g. in snap_sync.rs). The parameter can be dropped from the signature; callers already hold peer_id for the record_success/record_failure calls they make after the function returns.

Suggested change
Ok(self.peer_table.peer_count().await?)
}
/// Requests a single block header by number from an already-selected peer.
/// Consumes a `RequestPermit` reserved by the caller at peer selection
/// time; the permit drops when this function returns, releasing the slot.
pub async fn get_block_header(
&mut self,
connection: &mut PeerConnection,
_permit: RequestPermit,
block_number: u64,
) -> Result<Option<BlockHeader>, PeerHandlerError> {
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/peer_handler.rs
Line: 586-591

Comment:
**Unused `_peer_id` parameter in public API**

`_peer_id: H256` is accepted by `get_block_header` but never used inside the function — it was only consumed by the removed `make_request` call. All call sites still pass `peer_id` explicitly (e.g. in `snap_sync.rs`). The parameter can be dropped from the signature; callers already hold `peer_id` for the `record_success`/`record_failure` calls they make after the function returns.

```suggestion
    pub async fn get_block_header(
        &mut self,
        connection: &mut PeerConnection,
        _permit: RequestPermit,
        block_number: u64,
    ) -> Result<Option<BlockHeader>, PeerHandlerError> {
```

How can I resolve this? If you propose a fix, please make it concise.

pub async fn get_block_header(
&mut self,
peer_id: H256,
_peer_id: H256,
connection: &mut PeerConnection,
_permit: RequestPermit,
block_number: u64,
) -> Result<Option<BlockHeader>, PeerHandlerError> {
let request_id = rand::random();
Expand All @@ -638,14 +605,9 @@ impl PeerHandler {
reverse: false,
});
debug!("get_block_header: requesting header with number {block_number}");
match PeerHandler::make_request(
&self.peer_table,
peer_id,
connection,
request,
PEER_REPLY_TIMEOUT,
)
.await
match connection
.outgoing_request(request, PEER_REPLY_TIMEOUT)
.await
{
Ok(RLPxMessage::BlockHeaders(BlockHeaders {
id: _,
Expand Down
Loading
Loading