Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion crates/networking/p2p/snap/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,13 @@ pub async fn request_storage_ranges(
// channel to send the result of dumping storages
let mut disk_joinset: tokio::task::JoinSet<Result<(), DumpError>> = tokio::task::JoinSet::new();

// Track in-flight worker tasks so we can drain them before returning.
// The main loop can break early on staleness — detached `tokio::spawn`
// tasks would then try to `tx.send` to a dropped channel, skipping the
// `dec_requests` handler in the main loop's try_recv and leaking the
// peer's reservation slot.
let mut request_set: tokio::task::JoinSet<Result<(), SnapError>> = tokio::task::JoinSet::new();

let mut task_count = tasks_queue_not_started.len();
let mut completed_tasks = 0;

Expand Down Expand Up @@ -1002,7 +1009,7 @@ pub async fn request_storage_ranges(
// Reserve a request slot before spawning (see account range comment).
peers.peer_table.inc_requests(peer_id)?;

tokio::spawn(request_storage_ranges_worker(
request_set.spawn(request_storage_ranges_worker(
task,
peer_id,
connection,
Expand All @@ -1013,6 +1020,17 @@ pub async fn request_storage_ranges(
));
}

// Drain any remaining in-flight tasks and their responses so
// `dec_requests` fires for every worker. Without this, exiting the
// loop on staleness with in-flight workers would drop the receiver,
// causing the workers' `tx.send` to fail and leaking peer slots.
while !request_set.is_empty() {
let _ = request_set.join_next().await;
while let Ok(result) = task_receiver.try_recv() {
peers.peer_table.dec_requests(result.peer_id)?;
}
}

{
let snapshot = current_account_storages.into_values().collect::<Vec<_>>();

Expand Down
5 changes: 5 additions & 0 deletions crates/networking/p2p/sync/healing/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,16 @@ pub async fn heal_storage_trie(
}

if is_done {
// Await in-flight request tasks so `make_request`'s inc/dec
// stays balanced. Dropping the JoinSet aborts them mid-await
// and leaks peer reservation slots.
requests_task_joinset.join_all().await;
db_joinset.join_all().await;
return Ok(true);
}

if is_stale {
requests_task_joinset.join_all().await;
db_joinset.join_all().await;
state.healing_queue = HashMap::new();
return Ok(false);
Expand Down
Loading