diff --git a/crates/networking/p2p/snap/client.rs b/crates/networking/p2p/snap/client.rs index 1106c6c101e..5d230497f0f 100644 --- a/crates/networking/p2p/snap/client.rs +++ b/crates/networking/p2p/snap/client.rs @@ -597,6 +597,13 @@ pub async fn request_storage_ranges( // channel to send the result of dumping storages let mut disk_joinset: tokio::task::JoinSet> = tokio::task::JoinSet::new(); + // Track in-flight worker tasks so we can drain them before returning. + // The main loop can break early on staleness — detached `tokio::spawn` + // tasks would then try to `tx.send` to a dropped channel, skipping the + // `dec_requests` handler in the main loop's try_recv and leaking the + // peer's reservation slot. + let mut request_set: tokio::task::JoinSet> = tokio::task::JoinSet::new(); + let mut task_count = tasks_queue_not_started.len(); let mut completed_tasks = 0; @@ -1002,7 +1009,7 @@ pub async fn request_storage_ranges( // Reserve a request slot before spawning (see account range comment). peers.peer_table.inc_requests(peer_id)?; - tokio::spawn(request_storage_ranges_worker( + request_set.spawn(request_storage_ranges_worker( task, peer_id, connection, @@ -1013,6 +1020,17 @@ pub async fn request_storage_ranges( )); } + // Drain any remaining in-flight tasks and their responses so + // `dec_requests` fires for every worker. Without this, exiting the + // loop on staleness with in-flight workers would drop the receiver, + // causing the workers' `tx.send` to fail and leaking peer slots. + while !request_set.is_empty() { + let _ = request_set.join_next().await; + while let Ok(result) = task_receiver.try_recv() { + peers.peer_table.dec_requests(result.peer_id)?; + } + } + { let snapshot = current_account_storages.into_values().collect::>(); diff --git a/crates/networking/p2p/sync/healing/storage.rs b/crates/networking/p2p/sync/healing/storage.rs index c593b3756ba..a590976072e 100644 --- a/crates/networking/p2p/sync/healing/storage.rs +++ b/crates/networking/p2p/sync/healing/storage.rs @@ -237,11 +237,16 @@ pub async fn heal_storage_trie( } if is_done { + // Await in-flight request tasks so `make_request`'s inc/dec + // stays balanced. Dropping the JoinSet aborts them mid-await + // and leaks peer reservation slots. + requests_task_joinset.join_all().await; db_joinset.join_all().await; return Ok(true); } if is_stale { + requests_task_joinset.join_all().await; db_joinset.join_all().await; state.healing_queue = HashMap::new(); return Ok(false);