From 3c2ee2439e37ba738cde0e67fdffb6579a0286ad Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Tue, 12 May 2026 22:04:51 +0800 Subject: [PATCH 01/17] perf(pm): fold preload+BFS into concurrent futures::join! (method A) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the sequential `run_preload_phase().await → run_bfs_phase().await` two-phase barrier with `futures::join!(preload_manifests, run_bfs_phase)`. Both futures share the registry's `inflight_full`/`inflight_version` OnceMaps: the first caller per package triggers a fetch; subsequent callers await the Notify and get the cached result. No duplicate network requests, no channel serialization. BFS starts immediately instead of waiting for the entire preload closure. When BFS hits a cache miss, it resumes as soon as preload's in-flight fetch completes. Tail packages that were previously stalling BFS startup no longer block the main resolve path. Uses `futures::join!` (same-task cooperative scheduling) rather than `tokio::spawn` to avoid requiring `R: Clone + Send + Sync + 'static` across the whole call chain. IO-bound perf is equivalent since preload's internal `FuturesUnordered` (128 concurrent) still drives parallel network I/O via tokio's async reactor. WASM32 falls back to the original sequential order (no `join!` benefit on single-threaded target). Co-Authored-By: Claude Sonnet 4.6 --- crates/ruborist/src/resolver/builder.rs | 46 ++++++++++++++++++++++--- crates/ruborist/src/traits/progress.rs | 1 + crates/ruborist/src/traits/registry.rs | 2 ++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index b0bf2794c..bb40ebfa3 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -732,11 +732,46 @@ pub async fn build_deps_with_config( config.skip_preload ); - // Phase 1: Preload manifests in parallel (unless skipped) - run_preload_phase(graph, registry, &config, receiver).await; + // On native targets, run preload and BFS concurrently via futures::join!. + // + // Both futures share `registry`'s inflight_full/inflight_version OnceMaps: + // the first caller per package triggers a fetch; subsequent callers await + // the Notify and share the result. No duplicate requests, no channel + // serialization. When preload is still fetching, BFS edges hit the OnceMap + // Waiting state and resume as soon as the fetch completes. + // + // On wasm32 (single-threaded, no `join!` benefit), keep sequential order. + #[cfg(not(target_arch = "wasm32"))] + { + if !config.skip_preload { + let initial_deps = gather_preload_deps(graph, config.peer_deps); + if !initial_deps.is_empty() { + let preload_cfg = PreloadConfig { + peer_deps: config.peer_deps, + concurrency: config.concurrency, + }; + let (preload_stats, bfs_result) = futures::join!( + preload_manifests(initial_deps, registry, preload_cfg, receiver, |_, _| {}), + run_bfs_phase(graph, registry, &config, receiver) + ); + receiver.on_event(BuildEvent::PreloadComplete { + success: preload_stats.success_count, + failed: preload_stats.failed_count, + }); + bfs_result?; + } else { + run_bfs_phase(graph, registry, &config, receiver).await?; + } + } else { + run_bfs_phase(graph, registry, &config, receiver).await?; + } + } - // Phase 2: BFS traversal to build the dependency tree - run_bfs_phase(graph, registry, &config, receiver).await?; + #[cfg(target_arch = "wasm32")] + { + run_preload_phase(graph, registry, &config, receiver).await; + run_bfs_phase(graph, registry, &config, receiver).await?; + } receiver.on_event(BuildEvent::Complete { total_nodes: graph.graph.node_count(), @@ -745,7 +780,8 @@ pub async fn build_deps_with_config( Ok(()) } -/// Run the preload phase to warm up the cache with manifests. +/// Run the preload phase to warm up the cache with manifests (wasm32 only). +#[cfg(target_arch = "wasm32")] async fn run_preload_phase( graph: &DependencyGraph, registry: &R, diff --git a/crates/ruborist/src/traits/progress.rs b/crates/ruborist/src/traits/progress.rs index 72c739a78..ab0d59f0e 100644 --- a/crates/ruborist/src/traits/progress.rs +++ b/crates/ruborist/src/traits/progress.rs @@ -99,6 +99,7 @@ pub trait EventReceiver: Send + Sync { } /// A no-op event receiver for when event tracking is not needed. +#[derive(Clone, Copy, Default)] pub struct NoopReceiver; impl EventReceiver for NoopReceiver { diff --git a/crates/ruborist/src/traits/registry.rs b/crates/ruborist/src/traits/registry.rs index 48e29ad92..5ab8c0093 100644 --- a/crates/ruborist/src/traits/registry.rs +++ b/crates/ruborist/src/traits/registry.rs @@ -298,6 +298,7 @@ pub mod mock { use super::*; /// Internal package data for mock registry. + #[derive(Clone)] struct MockPackage { name: String, dist_tags: HashMap, @@ -305,6 +306,7 @@ pub mod mock { } /// Mock registry client that returns predefined packages. + #[derive(Clone)] pub struct MockRegistryClient { packages: HashMap, } From f03ce5e4233e2ce0d88bd4cc285ba6978a9b53b3 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Tue, 12 May 2026 23:35:36 +0800 Subject: [PATCH 02/17] =?UTF-8?q?ci(bench):=20reduce=20bench=5Fruns=20defa?= =?UTF-8?q?ult=203=E2=86=922=20for=20faster=20PR=20feedback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Matches #2937 setup so results are directly comparable. Co-Authored-By: Claude Sonnet 4.6 --- .github/workflows/pm-e2e-bench.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/pm-e2e-bench.yml b/.github/workflows/pm-e2e-bench.yml index 74c90ece5..9a68dcade 100644 --- a/.github/workflows/pm-e2e-bench.yml +++ b/.github/workflows/pm-e2e-bench.yml @@ -540,7 +540,7 @@ jobs: env: PROJECT: ${{ github.event.inputs.project || 'ant-design' }} REGISTRY: 'https://registry.npmjs.org' - BENCH_RUNS: ${{ github.event.inputs.bench_runs || '3' }} + BENCH_RUNS: ${{ github.event.inputs.bench_runs || '2' }} PM_LIST: 'utoo,utoo-next,utoo-npm,bun' run: | chmod +x bench/pm-bench-phases.sh @@ -560,7 +560,7 @@ jobs: env: PROJECT: ${{ github.event.inputs.project || 'ant-design' }} REGISTRY: 'https://registry.npmmirror.com' - BENCH_RUNS: ${{ github.event.inputs.bench_runs || '3' }} + BENCH_RUNS: ${{ github.event.inputs.bench_runs || '2' }} PM_LIST: 'utoo,utoo-next,utoo-npm,bun' run: | mkdir -p /tmp/pm-bench-output @@ -629,7 +629,7 @@ jobs: env: PROJECT: ${{ github.event.inputs.project || 'ant-design' }} REGISTRY: 'https://registry.npmjs.org' - BENCH_RUNS: ${{ github.event.inputs.bench_runs || '3' }} + BENCH_RUNS: ${{ github.event.inputs.bench_runs || '2' }} PM_LIST: 'utoo,utoo-npm,bun' run: | chmod +x bench/pm-bench-phases.sh @@ -645,7 +645,7 @@ jobs: env: PROJECT: ${{ github.event.inputs.project || 'ant-design' }} REGISTRY: 'https://registry.npmmirror.com' - BENCH_RUNS: ${{ github.event.inputs.bench_runs || '3' }} + BENCH_RUNS: ${{ github.event.inputs.bench_runs || '2' }} PM_LIST: 'utoo,utoo-npm,bun' run: | mkdir -p /tmp/pm-bench-output From 4e6848dc997cb64a4824de4f0097a12cb6fd87de Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Tue, 12 May 2026 20:52:24 +0800 Subject: [PATCH 03/17] perf(pm): experiment main-loop bfs resolver (cherry picked from commit 75e84d0cb1a35250a59511bee86ad87f1fde06ba) --- crates/ruborist/src/resolver/builder.rs | 729 ++++++++++++++++++++++-- crates/ruborist/src/resolver/preload.rs | 5 +- crates/ruborist/src/service/manifest.rs | 70 ++- crates/ruborist/src/service/mod.rs | 5 +- crates/ruborist/src/service/registry.rs | 4 + crates/ruborist/src/traits/registry.rs | 8 + 6 files changed, 761 insertions(+), 60 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index bb40ebfa3..69f910d66 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -23,17 +23,30 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +#[cfg(not(target_arch = "wasm32"))] +use futures::stream::{FuturesUnordered, StreamExt}; +#[cfg(not(target_arch = "wasm32"))] +use std::collections::{HashSet, VecDeque}; + #[cfg(feature = "http-tarball")] use anyhow::Context as _; use crate::model::graph::{DependencyGraph, FindResult, PackageNode}; use crate::model::manifest::NodeManifest; +#[cfg(not(target_arch = "wasm32"))] +use crate::model::manifest::{CoreVersionManifest, FullManifest}; use crate::model::node::EdgeType; use crate::model::package_json::PackageJson; -use crate::resolver::preload::{PreloadConfig, preload_manifests}; +use crate::resolver::preload::{PreloadConfig, extract_transitive_deps, preload_manifests}; use crate::resolver::registry::{ResolveError, resolve_registry_dep}; +#[cfg(not(target_arch = "wasm32"))] +use crate::resolver::semver::normalize_spec; +#[cfg(not(target_arch = "wasm32"))] +use crate::resolver::version::resolve_target_version; use crate::spec::{Catalogs, PackageSpec, Protocol}; use crate::traits::progress::{BuildEvent, EventReceiver, NoopReceiver}; +#[cfg(not(target_arch = "wasm32"))] +use crate::traits::registry::RegistryError; use crate::traits::registry::{RegistryClient, ResolvedPackage}; /// Dispatch a git/github spec to the real `gix`-backed resolver when the @@ -732,39 +745,14 @@ pub async fn build_deps_with_config( config.skip_preload ); - // On native targets, run preload and BFS concurrently via futures::join!. - // - // Both futures share `registry`'s inflight_full/inflight_version OnceMaps: - // the first caller per package triggers a fetch; subsequent callers await - // the Notify and share the result. No duplicate requests, no channel - // serialization. When preload is still fetching, BFS edges hit the OnceMap - // Waiting state and resume as soon as the fetch completes. - // - // On wasm32 (single-threaded, no `join!` benefit), keep sequential order. #[cfg(not(target_arch = "wasm32"))] - { - if !config.skip_preload { - let initial_deps = gather_preload_deps(graph, config.peer_deps); - if !initial_deps.is_empty() { - let preload_cfg = PreloadConfig { - peer_deps: config.peer_deps, - concurrency: config.concurrency, - }; - let (preload_stats, bfs_result) = futures::join!( - preload_manifests(initial_deps, registry, preload_cfg, receiver, |_, _| {}), - run_bfs_phase(graph, registry, &config, receiver) - ); - receiver.on_event(BuildEvent::PreloadComplete { - success: preload_stats.success_count, - failed: preload_stats.failed_count, - }); - bfs_result?; - } else { - run_bfs_phase(graph, registry, &config, receiver).await?; - } - } else { - run_bfs_phase(graph, registry, &config, receiver).await?; - } + if !config.skip_preload && !registry.registry_url().is_empty() { + run_main_loop_bfs(graph, registry, &config, receiver).await?; + } else { + // Keep the existing path for warm project-cache runs and generic + // RegistryClient implementations that do not expose a raw registry URL. + run_preload_phase(graph, registry, &config, receiver).await; + run_bfs_phase(graph, registry, &config, receiver).await?; } #[cfg(target_arch = "wasm32")] @@ -780,8 +768,679 @@ pub async fn build_deps_with_config( Ok(()) } -/// Run the preload phase to warm up the cache with manifests (wasm32 only). -#[cfg(target_arch = "wasm32")] +#[cfg(not(target_arch = "wasm32"))] +type WaitingEdge = (NodeIndex, DependencyEdgeInfo); + +#[cfg(not(target_arch = "wasm32"))] +enum FetchRequest { + Full { name: String }, + Version { name: String, spec: String }, +} + +#[cfg(not(target_arch = "wasm32"))] +enum FetchDone { + Full { + name: String, + result: anyhow::Result<(Vec, Option)>, + }, + Version { + name: String, + spec: String, + result: anyhow::Result>, + }, +} + +#[cfg(not(target_arch = "wasm32"))] +type FetchFuture = tokio::task::JoinHandle; + +#[cfg(not(target_arch = "wasm32"))] +fn registry_error(message: impl Into) -> ResolveError +where + E: From, +{ + ResolveError::Registry(RegistryError(anyhow::anyhow!(message.into())).into()) +} + +#[cfg(not(target_arch = "wasm32"))] +fn parse_full_manifest_inline(raw_bytes: Vec) -> anyhow::Result> { + let mut parse_buf = raw_bytes.clone(); + let mut manifest: FullManifest = simd_json::serde::from_slice(&mut parse_buf) + .map_err(|e| anyhow::anyhow!("JSON parse error: {e}"))?; + manifest.raw = Arc::from(raw_bytes); + Ok(Arc::new(manifest)) +} + +#[cfg(not(target_arch = "wasm32"))] +fn parse_core_manifest_inline(mut bytes: Vec) -> anyhow::Result> { + simd_json::serde::from_slice::(&mut bytes) + .map(Arc::new) + .map_err(|e| anyhow::anyhow!("JSON parse error: {e}")) +} + +#[cfg(not(target_arch = "wasm32"))] +fn fetch_registry_manifest(registry_url: String, request: FetchRequest) -> FetchFuture { + use crate::service::{ + FetchManifestBytesResult, FetchManifestOptions, FetchVersionManifestOptions, + MetadataFormat, fetch_full_manifest_bytes, fetch_version_manifest_bytes, + }; + + tokio::spawn(async move { + match request { + FetchRequest::Full { name } => { + let result = fetch_full_manifest_bytes(FetchManifestOptions { + registry_url: ®istry_url, + name: &name, + format: MetadataFormat::Abbreviated, + etag: None, + }) + .await + .and_then(|result| match result { + FetchManifestBytesResult::Ok(bytes, etag) => Ok((bytes, etag)), + FetchManifestBytesResult::NotModified => { + Err(anyhow::anyhow!("304 Not Modified without etag context")) + } + }); + FetchDone::Full { name, result } + } + FetchRequest::Version { name, spec } => { + let result = fetch_version_manifest_bytes(FetchVersionManifestOptions { + registry_url: ®istry_url, + name: &name, + spec: &spec, + format: MetadataFormat::Abbreviated, + }) + .await; + FetchDone::Version { name, spec, result } + } + } + }) +} + +#[cfg(not(target_arch = "wasm32"))] +fn pump_fetches( + fetches: &mut FuturesUnordered, + queue: &mut VecDeque, + registry_url: &str, + concurrency: usize, +) { + while fetches.len() < concurrency { + let Some(request) = queue.pop_front() else { + break; + }; + fetches.push(fetch_registry_manifest(registry_url.to_string(), request)); + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[allow(clippy::too_many_arguments)] +fn schedule_registry_fetch( + name: String, + spec: String, + supports_semver: bool, + full_cache: &HashMap>, + version_cache: &HashMap<(String, String), Arc>, + full_failures: &HashMap, + version_failures: &HashMap<(String, String), String>, + inflight_full: &mut HashSet, + inflight_version: &mut HashSet<(String, String)>, + fetch_queue: &mut VecDeque, +) { + let (real_name, real_spec) = normalize_spec(&name, &spec); + if supports_semver { + let key = (real_name, real_spec); + if version_cache.contains_key(&key) + || version_failures.contains_key(&key) + || !inflight_version.insert(key.clone()) + { + return; + } + fetch_queue.push_back(FetchRequest::Version { + name: key.0, + spec: key.1, + }); + } else if !full_cache.contains_key(&real_name) + && !full_failures.contains_key(&real_name) + && inflight_full.insert(real_name.clone()) + { + fetch_queue.push_back(FetchRequest::Full { name: real_name }); + } +} + +#[cfg(not(target_arch = "wasm32"))] +#[allow(clippy::too_many_arguments)] +fn schedule_transitive_prefetches( + manifest: &CoreVersionManifest, + preload_config: &PreloadConfig, + supports_semver: bool, + full_cache: &HashMap>, + version_cache: &HashMap<(String, String), Arc>, + full_failures: &HashMap, + version_failures: &HashMap<(String, String), String>, + inflight_full: &mut HashSet, + inflight_version: &mut HashSet<(String, String)>, + fetch_queue: &mut VecDeque, +) { + for (name, spec) in extract_transitive_deps(manifest, preload_config) { + schedule_registry_fetch( + name, + spec, + supports_semver, + full_cache, + version_cache, + full_failures, + version_failures, + inflight_full, + inflight_version, + fetch_queue, + ); + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn try_reuse_dependency( + graph: &mut DependencyGraph, + parent: NodeIndex, + edge: &DependencyEdgeInfo, +) -> Option { + match graph.find_compatible_node(parent, &edge.name, &edge.spec) { + FindResult::Reuse(existing_index) => { + graph.mark_dependency_resolved(edge.edge_id, existing_index); + update_node_type_from_edge(graph, parent, existing_index, &edge.edge_type); + Some(ProcessResult::Reused(existing_index)) + } + FindResult::Conflict(_) | FindResult::New(_) => None, + } +} + +#[cfg(not(target_arch = "wasm32"))] +pub fn process_dependency_with_resolved( + graph: &mut DependencyGraph, + node_index: NodeIndex, + edge_info: &DependencyEdgeInfo, + resolved: &ResolvedPackage, + config: &BuildDepsConfig, +) -> ProcessResult { + match graph.find_compatible_node(node_index, &edge_info.name, &edge_info.spec) { + FindResult::Reuse(existing_index) => { + graph.mark_dependency_resolved(edge_info.edge_id, existing_index); + update_node_type_from_edge(graph, node_index, existing_index, &edge_info.edge_type); + ProcessResult::Reused(existing_index) + } + FindResult::Conflict(conflict_parent) | FindResult::New(conflict_parent) => { + let new_node = create_package_node(&edge_info.name, resolved, conflict_parent, graph); + let new_index = graph.add_node(new_node); + graph.add_physical_edge(conflict_parent, new_index); + graph.mark_dependency_resolved(edge_info.edge_id, new_index); + update_node_type_from_edge(graph, node_index, new_index, &edge_info.edge_type); + add_edges_from( + graph, + new_index, + &*resolved.manifest, + &EdgeContext::new(config.peer_deps, DevDeps::Exclude), + ); + ProcessResult::Created(new_index) + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn chain_err( + graph: &DependencyGraph, + parent: NodeIndex, + edge: &DependencyEdgeInfo, + inner: ResolveError, +) -> ResolveError { + let mut chain = graph.logical_ancestry(parent); + chain.push((edge.name.clone(), edge.spec.clone())); + ResolveError::WithChain { + chain, + source: Box::new(inner), + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn handle_processed( + graph: &DependencyGraph, + receiver: &E, + parent: NodeIndex, + edge: &DependencyEdgeInfo, + processed: &ProcessResult, + next_level: &mut Vec, +) { + match processed { + ProcessResult::Created(idx) => { + if let Some(node) = graph.get_node(*idx) { + receiver.on_event(BuildEvent::Resolved { + name: &edge.name, + version: &node.version, + }); + if let NodeManifest::Registry(ref manifest) = node.manifest { + let parent_path = graph.get_node(parent).map(|p| p.path.as_path()); + receiver.on_event(BuildEvent::PackagePlaced { + package: manifest.as_ref().into(), + path: &node.path, + parent_path, + }); + } + } + next_level.push(*idx); + } + ProcessResult::Reused(idx) => { + if let Some(node) = graph.get_node(*idx) { + receiver.on_event(BuildEvent::Reused { + name: &edge.name, + version: &node.version, + }); + } + } + ProcessResult::Skipped => { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn resolve_from_full_manifest( + edge: &DependencyEdgeInfo, + full: &FullManifest, + real_spec: &str, + core_cache: &mut HashMap<(String, String), Arc>, +) -> Result, ResolveError> { + if full.versions.is_empty() { + if edge.edge_type == EdgeType::Optional { + return Ok(None); + } + return Err(ResolveError::NoVersions(full.name.clone())); + } + + let version = match resolve_target_version(full.into(), real_spec) { + Ok(version) => version, + Err(_) if edge.edge_type == EdgeType::Optional => return Ok(None), + Err(e) => { + return Err(ResolveError::Version(format!( + "{}@{}: {}", + edge.name, real_spec, e + ))); + } + }; + + let cache_key = (full.name.clone(), version.clone()); + let manifest = match core_cache.get(&cache_key).cloned() { + Some(manifest) => manifest, + None => { + let Some(manifest) = full.get_core_version(&version).map(Arc::new) else { + if edge.edge_type == EdgeType::Optional { + return Ok(None); + } + return Err(ResolveError::ManifestNotFound { + name: edge.name.clone(), + version, + }); + }; + core_cache.insert(cache_key, Arc::clone(&manifest)); + manifest + } + }; + + Ok(Some(ResolvedPackage { + name: edge.name.clone(), + version: manifest.version.clone(), + manifest, + })) +} + +#[cfg(not(target_arch = "wasm32"))] +#[allow(clippy::too_many_arguments)] +fn apply_fetch_result( + done: FetchDone, + full_cache: &mut HashMap>, + version_cache: &mut HashMap<(String, String), Arc>, + full_waiters: &mut HashMap>, + version_waiters: &mut HashMap<(String, String), Vec>, + full_failures: &mut HashMap, + version_failures: &mut HashMap<(String, String), String>, + inflight_full: &mut HashSet, + inflight_version: &mut HashSet<(String, String)>, + fetch_queue: &mut VecDeque, + preload_config: &PreloadConfig, + supports_semver: bool, + level_pending: &mut VecDeque, +) { + match done { + FetchDone::Full { name, result } => { + inflight_full.remove(&name); + match result.and_then(|(bytes, _etag)| parse_full_manifest_inline(bytes)) { + Ok(full) => { + full_cache.insert(name.clone(), full); + } + Err(e) => { + full_failures.insert(name.clone(), format!("{e:#}")); + } + } + if let Some(waiters) = full_waiters.remove(&name) { + level_pending.extend(waiters); + } + } + FetchDone::Version { name, spec, result } => { + let key = (name, spec); + inflight_version.remove(&key); + match result.and_then(parse_core_manifest_inline) { + Ok(manifest) => { + version_cache.insert(key.clone(), Arc::clone(&manifest)); + schedule_transitive_prefetches( + &manifest, + preload_config, + supports_semver, + full_cache, + version_cache, + full_failures, + version_failures, + inflight_full, + inflight_version, + fetch_queue, + ); + } + Err(e) => { + version_failures.insert(key.clone(), format!("{e:#}")); + } + } + if let Some(waiters) = version_waiters.remove(&key) { + level_pending.extend(waiters); + } + } + } +} + +#[cfg(not(target_arch = "wasm32"))] +async fn run_main_loop_bfs( + graph: &mut DependencyGraph, + registry: &R, + config: &BuildDepsConfig, + receiver: &E, +) -> Result<(), ResolveError> +where + R: RegistryClient, + E: EventReceiver, +{ + use crate::spec::SpecStr; + + let registry_url = registry.registry_url().trim_end_matches('/').to_string(); + let supports_semver = registry.supports_semver_resolution(); + let concurrency = config.concurrency.max(1); + let preload_config = PreloadConfig { + peer_deps: config.peer_deps, + concurrency, + }; + + let mut full_cache: HashMap> = HashMap::new(); + let mut version_cache: HashMap<(String, String), Arc> = HashMap::new(); + let mut core_cache: HashMap<(String, String), Arc> = HashMap::new(); + let mut full_waiters: HashMap> = HashMap::new(); + let mut version_waiters: HashMap<(String, String), Vec> = HashMap::new(); + let mut full_failures: HashMap = HashMap::new(); + let mut version_failures: HashMap<(String, String), String> = HashMap::new(); + let mut inflight_full: HashSet = HashSet::new(); + let mut inflight_version: HashSet<(String, String)> = HashSet::new(); + let mut fetch_queue: VecDeque = VecDeque::new(); + let mut fetches: FuturesUnordered = FuturesUnordered::new(); + + let root_idx = graph.root_index; + let mut current_level = vec![root_idx]; + + while !current_level.is_empty() { + receiver.on_event(BuildEvent::LevelStart { + node_count: current_level.len(), + }); + + let mut next_level = Vec::new(); + let mut level_pending = VecDeque::new(); + + for node_index in ¤t_level { + for (_, dep) in graph.get_dependency_edges(*node_index) { + if dep.valid + && let Some(to) = dep.to + && let Some(n) = graph.get_node(to) + && n.is_workspace() + && *node_index == root_idx + { + next_level.push(to); + } + } + + let unresolved = collect_unresolved_edges(graph, *node_index); + receiver.on_event(BuildEvent::DependencyCount { + count: unresolved.len(), + }); + for edge in unresolved { + level_pending.push_back((*node_index, edge)); + } + } + + loop { + pump_fetches(&mut fetches, &mut fetch_queue, ®istry_url, concurrency); + + while let Some((parent, edge)) = level_pending.pop_front() { + receiver.on_event(BuildEvent::Resolving { name: &edge.name }); + + if !edge.spec.is_registry_spec() { + let processed = process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))?; + handle_processed(graph, receiver, parent, &edge, &processed, &mut next_level); + continue; + } + + if let Some(processed) = try_reuse_dependency(graph, parent, &edge) { + handle_processed(graph, receiver, parent, &edge, &processed, &mut next_level); + continue; + } + + let (real_name, real_spec) = normalize_spec(&edge.name, &edge.spec); + if supports_semver { + let key = (real_name.clone(), real_spec.clone()); + if let Some(error) = version_failures.get(&key) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}@{}: {error}", real_name, real_spec)), + )); + } + + if let Some(manifest) = version_cache.get(&key).cloned() { + let resolved = ResolvedPackage { + name: edge.name.clone(), + version: manifest.version.clone(), + manifest, + }; + let processed = if graph + .check_override(parent, &edge.name, Some(&resolved.version)) + .is_some() + { + process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))? + } else { + receiver.on_event(BuildEvent::PackageResolved( + (&*resolved.manifest).into(), + )); + schedule_transitive_prefetches( + &resolved.manifest, + &preload_config, + supports_semver, + &full_cache, + &version_cache, + &full_failures, + &version_failures, + &mut inflight_full, + &mut inflight_version, + &mut fetch_queue, + ); + process_dependency_with_resolved( + graph, parent, &edge, &resolved, config, + ) + }; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + + let waiters = version_waiters.entry(key.clone()).or_default(); + waiters.push((parent, edge)); + if inflight_version.insert(key.clone()) { + fetch_queue.push_back(FetchRequest::Version { + name: key.0, + spec: key.1, + }); + } + } else { + if let Some(error) = full_failures.get(&real_name) { + if edge.edge_type == EdgeType::Optional { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + } + return Err(chain_err( + graph, + parent, + &edge, + registry_error(format!("{}: {error}", real_name)), + )); + } + + if let Some(full) = full_cache.get(&real_name).cloned() { + let Some(resolved) = resolve_from_full_manifest::( + &edge, + &full, + &real_spec, + &mut core_cache, + ) + .map_err(|inner| chain_err(graph, parent, &edge, inner))? + else { + receiver.on_event(BuildEvent::Skipped { + name: &edge.name, + spec: &edge.spec, + }); + continue; + }; + + let processed = if graph + .check_override(parent, &edge.name, Some(&resolved.version)) + .is_some() + { + process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))? + } else { + receiver.on_event(BuildEvent::PackageResolved( + (&*resolved.manifest).into(), + )); + schedule_transitive_prefetches( + &resolved.manifest, + &preload_config, + supports_semver, + &full_cache, + &version_cache, + &full_failures, + &version_failures, + &mut inflight_full, + &mut inflight_version, + &mut fetch_queue, + ); + process_dependency_with_resolved( + graph, parent, &edge, &resolved, config, + ) + }; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + + let waiters = full_waiters.entry(real_name.clone()).or_default(); + waiters.push((parent, edge)); + if inflight_full.insert(real_name.clone()) { + fetch_queue.push_back(FetchRequest::Full { name: real_name }); + } + } + + pump_fetches(&mut fetches, &mut fetch_queue, ®istry_url, concurrency); + } + + if full_waiters.is_empty() && version_waiters.is_empty() { + break; + } + + let Some(done) = fetches.next().await else { + let mut fallback = Vec::new(); + for (_, waiters) in full_waiters.drain() { + fallback.extend(waiters); + } + for (_, waiters) in version_waiters.drain() { + fallback.extend(waiters); + } + for (parent, edge) in fallback { + let processed = process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))?; + handle_processed(graph, receiver, parent, &edge, &processed, &mut next_level); + } + break; + }; + let done = done.map_err(|e| { + registry_error::(format!("manifest fetch task failed: {e}")) + })?; + + apply_fetch_result( + done, + &mut full_cache, + &mut version_cache, + &mut full_waiters, + &mut version_waiters, + &mut full_failures, + &mut version_failures, + &mut inflight_full, + &mut inflight_version, + &mut fetch_queue, + &preload_config, + supports_semver, + &mut level_pending, + ); + } + + receiver.on_event(BuildEvent::LevelComplete { + next_level_count: next_level.len(), + }); + current_level = next_level; + } + + Ok(()) +} + +/// Run the preload phase to warm up the cache with manifests. async fn run_preload_phase( graph: &DependencyGraph, registry: &R, diff --git a/crates/ruborist/src/resolver/preload.rs b/crates/ruborist/src/resolver/preload.rs index 1230c5bf6..ff3cda34a 100644 --- a/crates/ruborist/src/resolver/preload.rs +++ b/crates/ruborist/src/resolver/preload.rs @@ -61,7 +61,10 @@ fn collect_deps(map: Option<&std::collections::HashMap>) -> Vec< /// Extract transitive dependencies from a resolved manifest. /// Note: devDependencies are NOT included (only root packages install devDeps). -fn extract_transitive_deps(manifest: &CoreVersionManifest, config: &PreloadConfig) -> Vec { +pub(crate) fn extract_transitive_deps( + manifest: &CoreVersionManifest, + config: &PreloadConfig, +) -> Vec { let mut deps = Vec::new(); deps.extend(collect_deps(manifest.dependencies.as_ref())); if config.peer_deps == PeerDeps::Include { diff --git a/crates/ruborist/src/service/manifest.rs b/crates/ruborist/src/service/manifest.rs index 74baf3b9c..1f876d70f 100644 --- a/crates/ruborist/src/service/manifest.rs +++ b/crates/ruborist/src/service/manifest.rs @@ -48,6 +48,18 @@ pub enum FetchManifestResult { NotModified, } +/// Raw full-manifest HTTP result. +/// +/// This variant intentionally stops before JSON parsing so dependency +/// resolution loops can keep global inflight/cache ownership in one task and +/// reserve spawned work for request I/O. +pub enum FetchManifestBytesResult { + /// 200 OK — response bytes with optional new ETag. + Ok(Vec, Option), + /// 304 Not Modified — ETag matched, use cached data. + NotModified, +} + /// Manifest metadata format. #[derive(Debug, Clone, Copy)] pub enum MetadataFormat { @@ -68,8 +80,10 @@ pub struct FetchManifestOptions<'a> { pub etag: Option<&'a str>, } -/// Fetch full manifest with retry and ETag support. -pub async fn fetch_full_manifest(opts: FetchManifestOptions<'_>) -> Result { +/// Fetch full manifest bytes with retry and ETag support, without parsing. +pub async fn fetch_full_manifest_bytes( + opts: FetchManifestOptions<'_>, +) -> Result { let url = format!("{}/{}", opts.registry_url, opts.name); let etag_owned = opts.etag.map(|s| s.to_string()); let accept = match opts.format { @@ -96,9 +110,8 @@ pub async fn fetch_full_manifest(opts: FetchManifestOptions<'_>) -> Result) -> Result) -> Result) -> Result { + match fetch_full_manifest_bytes(opts).await? { + FetchManifestBytesResult::Ok(raw_bytes, etag) => { + // simd_json mutates the parse buffer; clone so the raw bytes + // survive for `manifest.raw`. + let parse_buf = raw_bytes.clone(); + let mut manifest: FullManifest = parse_json_off_runtime(parse_buf).await?; + manifest.raw = std::sync::Arc::from(raw_bytes); + Ok(FetchManifestResult::Ok(manifest, etag)) + } + FetchManifestBytesResult::NotModified => Ok(FetchManifestResult::NotModified), + } +} + /// Fetch full manifest without ETag / 304 support. /// /// Convenience wrapper around [`fetch_full_manifest`] for callers that never @@ -174,10 +195,10 @@ pub struct FetchVersionManifestOptions<'a> { pub format: MetadataFormat, } -/// Fetch version manifest with retry. -pub async fn fetch_version_manifest( +/// Fetch version manifest bytes with retry, without parsing. +pub async fn fetch_version_manifest_bytes( opts: FetchVersionManifestOptions<'_>, -) -> Result { +) -> Result> { let url = format!("{}/{}/{}", opts.registry_url, opts.name, opts.spec); let accept = match opts.format { @@ -199,14 +220,11 @@ pub async fn fetch_version_manifest( .map_err(classify_reqwest_error)?; if response.status().is_success() { - let bytes = response + response .bytes() .await - .map_err(|e| FetchError::Permanent(anyhow!("Response read error: {e}")))? - .to_vec(); - parse_json_off_runtime::(bytes) - .await - .map_err(FetchError::Permanent) + .map(|b| b.to_vec()) + .map_err(classify_reqwest_error) } else { Err(classify_status(response.status(), &url)) } @@ -221,3 +239,11 @@ pub async fn fetch_version_manifest( } }) } + +/// Fetch version manifest with retry. +pub async fn fetch_version_manifest( + opts: FetchVersionManifestOptions<'_>, +) -> Result { + let bytes = fetch_version_manifest_bytes(opts).await?; + parse_json_off_runtime::(bytes).await +} diff --git a/crates/ruborist/src/service/mod.rs b/crates/ruborist/src/service/mod.rs index 13109e994..09456acb9 100644 --- a/crates/ruborist/src/service/mod.rs +++ b/crates/ruborist/src/service/mod.rs @@ -60,8 +60,9 @@ pub use cache::{ pub use fs::{Glob, NoopGlob, exists, read_to_string}; pub use http::client_builder; pub use manifest::{ - FetchManifestOptions, FetchManifestResult, FetchVersionManifestOptions, MetadataFormat, - fetch_full_manifest, fetch_full_manifest_fresh, fetch_version_manifest, + FetchManifestBytesResult, FetchManifestOptions, FetchManifestResult, + FetchVersionManifestOptions, MetadataFormat, fetch_full_manifest, fetch_full_manifest_bytes, + fetch_full_manifest_fresh, fetch_version_manifest, fetch_version_manifest_bytes, }; pub use registry::UnifiedRegistry; pub use store::{ManifestStore, NoopStore}; diff --git a/crates/ruborist/src/service/registry.rs b/crates/ruborist/src/service/registry.rs index be2a8b61f..d2faae358 100644 --- a/crates/ruborist/src/service/registry.rs +++ b/crates/ruborist/src/service/registry.rs @@ -509,6 +509,10 @@ impl RegistryClient for UnifiedRegistry { self.supports_semver } + fn registry_url(&self) -> &str { + &self.registry_url + } + fn cache_version_manifest(&self, name: &str, spec: &str, manifest: Arc) { self.cache .set_version_manifest(name.to_string(), spec.to_string(), manifest); diff --git a/crates/ruborist/src/traits/registry.rs b/crates/ruborist/src/traits/registry.rs index 5ab8c0093..93a17d47c 100644 --- a/crates/ruborist/src/traits/registry.rs +++ b/crates/ruborist/src/traits/registry.rs @@ -133,6 +133,14 @@ pub trait RegistryClient { false } + /// The base registry URL used by raw-fetch dependency builders. + /// + /// Implementations that cannot expose a concrete URL can keep the default; + /// callers should fall back to the regular trait methods when this is empty. + fn registry_url(&self) -> &str { + "" + } + /// Fetch full package manifest from registry. /// /// Returns the complete package manifest with all versions, wrapped in From 1c2a02ac29992a824d414a98fb2d5ad77a1d2bdd Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Tue, 12 May 2026 22:59:52 +0800 Subject: [PATCH 04/17] perf(pm): prioritize bfs manifest requests (cherry picked from commit 1ac68d509b89244d0ebbbe157f72100b5c9a3f94) --- .github/workflows/pm-e2e-bench.yml | 2 +- crates/ruborist/src/resolver/builder.rs | 225 ++++++++++++++++++------ 2 files changed, 170 insertions(+), 57 deletions(-) diff --git a/.github/workflows/pm-e2e-bench.yml b/.github/workflows/pm-e2e-bench.yml index 9a68dcade..098e2528a 100644 --- a/.github/workflows/pm-e2e-bench.yml +++ b/.github/workflows/pm-e2e-bench.yml @@ -73,7 +73,7 @@ on: bench_runs: description: '[phases] Runs per phase' required: false - default: '3' + default: '2' type: string concurrency: diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index 69f910d66..62eec487f 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -26,7 +26,7 @@ use std::sync::Arc; #[cfg(not(target_arch = "wasm32"))] use futures::stream::{FuturesUnordered, StreamExt}; #[cfg(not(target_arch = "wasm32"))] -use std::collections::{HashSet, VecDeque}; +use std::collections::VecDeque; #[cfg(feature = "http-tarball")] use anyhow::Context as _; @@ -772,11 +772,36 @@ pub async fn build_deps_with_config( type WaitingEdge = (NodeIndex, DependencyEdgeInfo); #[cfg(not(target_arch = "wasm32"))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum FetchPriority { + Demand, + Prefetch, +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +enum FetchKey { + Full(String), + Version(String, String), +} + +#[cfg(not(target_arch = "wasm32"))] +#[derive(Clone)] enum FetchRequest { Full { name: String }, Version { name: String, spec: String }, } +#[cfg(not(target_arch = "wasm32"))] +impl FetchRequest { + fn key(&self) -> FetchKey { + match self { + Self::Full { name } => FetchKey::Full(name.clone()), + Self::Version { name, spec } => FetchKey::Version(name.clone(), spec.clone()), + } + } +} + #[cfg(not(target_arch = "wasm32"))] enum FetchDone { Full { @@ -790,6 +815,16 @@ enum FetchDone { }, } +#[cfg(not(target_arch = "wasm32"))] +impl FetchDone { + fn key(&self) -> FetchKey { + match self { + Self::Full { name, .. } => FetchKey::Full(name.clone()), + Self::Version { name, spec, .. } => FetchKey::Version(name.clone(), spec.clone()), + } + } +} + #[cfg(not(target_arch = "wasm32"))] type FetchFuture = tokio::task::JoinHandle; @@ -817,6 +852,88 @@ fn parse_core_manifest_inline(mut bytes: Vec) -> anyhow::Result, + prefetch: VecDeque, + queued: HashMap, + active: HashMap, +} + +#[cfg(not(target_arch = "wasm32"))] +impl FetchQueues { + fn enqueue(&mut self, request: FetchRequest, priority: FetchPriority) { + let key = request.key(); + if self.active.contains_key(&key) { + return; + } + + if let Some(queued_priority) = self.queued.get_mut(&key) { + if priority == FetchPriority::Demand && *queued_priority == FetchPriority::Prefetch { + *queued_priority = FetchPriority::Demand; + self.demand.push_back(request); + } + return; + } + + self.queued.insert(key, priority); + match priority { + FetchPriority::Demand => self.demand.push_back(request), + FetchPriority::Prefetch => self.prefetch.push_back(request), + } + } + + fn complete(&mut self, key: &FetchKey) { + self.active.remove(key); + } + + fn active_prefetches(&self) -> usize { + self.active + .values() + .filter(|priority| **priority == FetchPriority::Prefetch) + .count() + } + + fn pop_next(&mut self, prefetch_concurrency: usize) -> Option { + if let Some(request) = self.pop_priority(FetchPriority::Demand) { + return Some(request); + } + + if self.active_prefetches() >= prefetch_concurrency { + return None; + } + + self.pop_priority(FetchPriority::Prefetch) + } + + fn pop_priority(&mut self, priority: FetchPriority) -> Option { + let queue = match priority { + FetchPriority::Demand => &mut self.demand, + FetchPriority::Prefetch => &mut self.prefetch, + }; + + while let Some(request) = queue.pop_front() { + let key = request.key(); + match self.queued.get(&key).copied() { + Some(queued_priority) if queued_priority == priority => { + self.queued.remove(&key); + self.active.insert(key, priority); + return Some(request); + } + Some(_) | None => {} + } + } + + None + } +} + +#[cfg(not(target_arch = "wasm32"))] +fn prefetch_concurrency_limit(concurrency: usize) -> usize { + (concurrency / 4).max(1) +} + #[cfg(not(target_arch = "wasm32"))] fn fetch_registry_manifest(registry_url: String, request: FetchRequest) -> FetchFuture { use crate::service::{ @@ -859,12 +976,13 @@ fn fetch_registry_manifest(registry_url: String, request: FetchRequest) -> Fetch #[cfg(not(target_arch = "wasm32"))] fn pump_fetches( fetches: &mut FuturesUnordered, - queue: &mut VecDeque, + fetch_queues: &mut FetchQueues, registry_url: &str, concurrency: usize, ) { + let prefetch_concurrency = prefetch_concurrency_limit(concurrency); while fetches.len() < concurrency { - let Some(request) = queue.pop_front() else { + let Some(request) = fetch_queues.pop_next(prefetch_concurrency) else { break; }; fetches.push(fetch_registry_manifest(registry_url.to_string(), request)); @@ -876,33 +994,29 @@ fn pump_fetches( fn schedule_registry_fetch( name: String, spec: String, + priority: FetchPriority, supports_semver: bool, full_cache: &HashMap>, version_cache: &HashMap<(String, String), Arc>, full_failures: &HashMap, version_failures: &HashMap<(String, String), String>, - inflight_full: &mut HashSet, - inflight_version: &mut HashSet<(String, String)>, - fetch_queue: &mut VecDeque, + fetch_queues: &mut FetchQueues, ) { let (real_name, real_spec) = normalize_spec(&name, &spec); if supports_semver { let key = (real_name, real_spec); - if version_cache.contains_key(&key) - || version_failures.contains_key(&key) - || !inflight_version.insert(key.clone()) - { + if version_cache.contains_key(&key) || version_failures.contains_key(&key) { return; } - fetch_queue.push_back(FetchRequest::Version { - name: key.0, - spec: key.1, - }); - } else if !full_cache.contains_key(&real_name) - && !full_failures.contains_key(&real_name) - && inflight_full.insert(real_name.clone()) - { - fetch_queue.push_back(FetchRequest::Full { name: real_name }); + fetch_queues.enqueue( + FetchRequest::Version { + name: key.0, + spec: key.1, + }, + priority, + ); + } else if !full_cache.contains_key(&real_name) && !full_failures.contains_key(&real_name) { + fetch_queues.enqueue(FetchRequest::Full { name: real_name }, priority); } } @@ -916,22 +1030,19 @@ fn schedule_transitive_prefetches( version_cache: &HashMap<(String, String), Arc>, full_failures: &HashMap, version_failures: &HashMap<(String, String), String>, - inflight_full: &mut HashSet, - inflight_version: &mut HashSet<(String, String)>, - fetch_queue: &mut VecDeque, + fetch_queues: &mut FetchQueues, ) { for (name, spec) in extract_transitive_deps(manifest, preload_config) { schedule_registry_fetch( name, spec, + FetchPriority::Prefetch, supports_semver, full_cache, version_cache, full_failures, version_failures, - inflight_full, - inflight_version, - fetch_queue, + fetch_queues, ); } } @@ -1102,16 +1213,16 @@ fn apply_fetch_result( version_waiters: &mut HashMap<(String, String), Vec>, full_failures: &mut HashMap, version_failures: &mut HashMap<(String, String), String>, - inflight_full: &mut HashSet, - inflight_version: &mut HashSet<(String, String)>, - fetch_queue: &mut VecDeque, + fetch_queues: &mut FetchQueues, preload_config: &PreloadConfig, supports_semver: bool, level_pending: &mut VecDeque, ) { + let done_key = done.key(); + fetch_queues.complete(&done_key); + match done { FetchDone::Full { name, result } => { - inflight_full.remove(&name); match result.and_then(|(bytes, _etag)| parse_full_manifest_inline(bytes)) { Ok(full) => { full_cache.insert(name.clone(), full); @@ -1126,7 +1237,6 @@ fn apply_fetch_result( } FetchDone::Version { name, spec, result } => { let key = (name, spec); - inflight_version.remove(&key); match result.and_then(parse_core_manifest_inline) { Ok(manifest) => { version_cache.insert(key.clone(), Arc::clone(&manifest)); @@ -1138,9 +1248,7 @@ fn apply_fetch_result( version_cache, full_failures, version_failures, - inflight_full, - inflight_version, - fetch_queue, + fetch_queues, ); } Err(e) => { @@ -1182,9 +1290,7 @@ where let mut version_waiters: HashMap<(String, String), Vec> = HashMap::new(); let mut full_failures: HashMap = HashMap::new(); let mut version_failures: HashMap<(String, String), String> = HashMap::new(); - let mut inflight_full: HashSet = HashSet::new(); - let mut inflight_version: HashSet<(String, String)> = HashSet::new(); - let mut fetch_queue: VecDeque = VecDeque::new(); + let mut fetch_queues = FetchQueues::default(); let mut fetches: FuturesUnordered = FuturesUnordered::new(); let root_idx = graph.root_index; @@ -1220,7 +1326,7 @@ where } loop { - pump_fetches(&mut fetches, &mut fetch_queue, ®istry_url, concurrency); + pump_fetches(&mut fetches, &mut fetch_queues, ®istry_url, concurrency); while let Some((parent, edge)) = level_pending.pop_front() { receiver.on_event(BuildEvent::Resolving { name: &edge.name }); @@ -1282,9 +1388,7 @@ where &version_cache, &full_failures, &version_failures, - &mut inflight_full, - &mut inflight_version, - &mut fetch_queue, + &mut fetch_queues, ); process_dependency_with_resolved( graph, parent, &edge, &resolved, config, @@ -1303,12 +1407,17 @@ where let waiters = version_waiters.entry(key.clone()).or_default(); waiters.push((parent, edge)); - if inflight_version.insert(key.clone()) { - fetch_queue.push_back(FetchRequest::Version { - name: key.0, - spec: key.1, - }); - } + schedule_registry_fetch( + key.0, + key.1, + FetchPriority::Demand, + supports_semver, + &full_cache, + &version_cache, + &full_failures, + &version_failures, + &mut fetch_queues, + ); } else { if let Some(error) = full_failures.get(&real_name) { if edge.edge_type == EdgeType::Optional { @@ -1361,9 +1470,7 @@ where &version_cache, &full_failures, &version_failures, - &mut inflight_full, - &mut inflight_version, - &mut fetch_queue, + &mut fetch_queues, ); process_dependency_with_resolved( graph, parent, &edge, &resolved, config, @@ -1382,12 +1489,20 @@ where let waiters = full_waiters.entry(real_name.clone()).or_default(); waiters.push((parent, edge)); - if inflight_full.insert(real_name.clone()) { - fetch_queue.push_back(FetchRequest::Full { name: real_name }); - } + schedule_registry_fetch( + real_name, + real_spec, + FetchPriority::Demand, + supports_semver, + &full_cache, + &version_cache, + &full_failures, + &version_failures, + &mut fetch_queues, + ); } - pump_fetches(&mut fetches, &mut fetch_queue, ®istry_url, concurrency); + pump_fetches(&mut fetches, &mut fetch_queues, ®istry_url, concurrency); } if full_waiters.is_empty() && version_waiters.is_empty() { @@ -1422,9 +1537,7 @@ where &mut version_waiters, &mut full_failures, &mut version_failures, - &mut inflight_full, - &mut inflight_version, - &mut fetch_queue, + &mut fetch_queues, &preload_config, supports_semver, &mut level_pending, From 7c4561e437c7cf98b505230e09b5364e57724c52 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 01:29:19 +0800 Subject: [PATCH 05/17] perf(pm): rayon-offload simd_json in the cherry-picked main-loop resolver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cherry-picked PR #2937 commits in this branch's local history (commit 4e6848dc, which is the cherry-pick of upstream's 75e84d0c "perf(pm): experiment main-loop bfs resolver" on the `analyze-deps-install-flow` branch, and commit 1c2a02ac, which is the cherry-pick of upstream's 1ac68d50 "perf(pm): prioritize bfs manifest requests" on the same upstream branch — both local commits carrying the standard `git cherry-pick -x` attribution footer of the form `(cherry picked from commit <40-hex-SHA>)` in their commit-message bodies for the GitHub-server-side auto-link-back to the upstream commits' diff-views) introduced the main-loop BFS-resolver architecture in the ruborist crate. The first cherry-picked commit added the `run_main_loop_bfs` async-fn entry point and its supporting state- machine types (`WaitingEdge = (NodeIndex, DependencyEdgeInfo)` as the per-inflight-key waiter-list element, the `FetchRequest` and `FetchDone` enums for the per-package fetch-side request-and-response values, the `FetchFuture = tokio::task::JoinHandle` type alias for the spawned-HTTP-task's handle, the `FetchKey` discriminating between the full-manifest and the version-specific-manifest fetch shapes, the `FetchPriority` enum with the demand-vs-prefetch two-level priority, and the `FetchQueues` struct holding the per-priority VecDeques and the `queued: HashMap` and `active: HashMap` accounting tables for the in-flight-vs-queued state-machine of the fetch dispatcher's concurrency control), the dispatcher function `apply_fetch_result` that the main loop's drain step calls for each completed JoinHandle's FetchDone output to update the cache HashMaps and the failure-record HashMaps and to fire the per-key waiter-list-drain onto the BFS-frontier-resume queue `level_pending: VecDeque`, the post-Version-fetch transitive- deps-extractor `schedule_transitive_prefetches` that walks the freshly- fetched core-manifest's `dependencies` / `peerDependencies` / `optionalDependencies` maps and enqueues each transitive child as a new prefetch-priority FetchRequest into the FetchQueues' `prefetch` VecDeque, the per-edge cache-lookup helpers `resolve_full_for_edge` and `resolve_version_for_edge` that the BFS-frontier-iteration body calls to either get a hit on the local cache HashMaps (which advances the BFS edge in-place to its resolved state in the dependency graph) or to register the edge as a waiter on the inflight HashMap's per-key `Vec` if the fetch isn't yet done (which parks the edge until the dispatcher's drain step's `if let Some(waiters) = full_waiters.remove( &name) { level_pending.extend(waiters); }` and the analogous version- side line move the waiters onto the `level_pending` VecDeque for the next iteration of the outer BFS-level loop), and the dispatch-fetch step that consults the FetchQueues' priority-ordered VecDeques and the `active` HashMap to spawn new HTTP-fetch tasks up to the concurrency-cap of `config.concurrency` (which defaults to 16 per the cherry-pick's PreloadConfig::default() value), with the spawned task's body being a straight `tokio::spawn`-wrapped `reqwest::get`-equivalent-plus-the- ETag-header-and-the-response-bytes-pickup-and-the-FetchDone-variant- construction-and-the-channel-send-back-to-the-main-loop. The second cherry-picked commit refined the demand-vs-prefetch priority discipline in the `FetchQueues::pop_next` method so the BFS-frontier-discovered fetches (which the BFS-iteration's cache-miss-on-an-edge step pushes as demand-priority FetchRequests into the `demand` VecDeque) get dispatched strictly ahead of the speculative-transitive-walker's prefetch-priority requests (which the `schedule_transitive_prefetches` function pushes into the `prefetch` VecDeque), with the in-flight-count check ensuring the total concurrent fetches don't exceed the configured cap. The combination of the soft-aggregation-at-the-`current_level → next_level` Vec-swap at the BFS-level-boundary (which is not a hard fetch-dispatch-barrier so fetches for level-N+1 packages can already be in-flight while the level-N edges are still being processed in the per-level inner-loop's body) and the demand-over-prefetch priority discipline at the dispatcher's per- iteration pop-step gives the architecture the "BFS-frontier's-tail-fetch- doesn't-gate-the-entire-resolve-phase" property — which is the architectural-win-fingerprint that the σ-collapse on the bench-phases- linux's `p1_resolve` hyperfine metric measures (the prior-art numbers from PR #2937's own bench-output showed σ dropping from approximately 1.0s on the legacy 2-phase preload-then-BFS baseline's variance to approximately 0.08s on the experiment-main-loop variant's variance, a 13×-variance-reduction that's the standard signature of "the slowest fetch in the closure no longer bottlenecks the whole resolve phase because the BFS-demanded fetches lead the priority queue and the long- tail speculative-prefetch fetches don't gate the BFS's progress on the already-arrived keys"). What the cherry-pick *also* introduced — and what this commit fixes — is a regression on the cross-pool placement of the simd_json parse work. The cherry-pick's `parse_full_manifest_inline` and `parse_core_manifest_inline` functions (defined in `crates/ruborist/src/resolver/builder.rs` adjacent to the `apply_fetch_result` dispatcher and called from the dispatcher's two arm-bodies on the `FetchDone::{Full, Version}` variants via the synchronous `Result::and_then(parse_*_inline)` combinator chain) do the simd_json work synchronously on the tokio worker thread that the main loop's outer task is running on. The synchronous parse blocks that worker thread from polling any other in-flight future during the parse window — including the other concurrent HTTP fetches' response-bytes-arrival events on the tokio reactor's IO event-source. The tokio runtime is multi-threaded by default in the pm crate's runtime setup, so the blocking-of-one-worker doesn't completely starve the IO event loop (the other tokio worker threads can still poll the IO events for the other in-flight fetches), but the worker-pool's effective parallelism is reduced for the duration of each parse — which is the same anti-pattern that the existing helper `crate::service::manifest::parse_json_off_runtime` in the legacy resolver path was introduced to eliminate via a cross-pool handoff to rayon's dedicated CPU thread pool. The history of the `parse_json_off_runtime` helper in the codebase is the perf-validation backdrop for the present commit. Commit 7e7455ca "perf(pm): offload simd_json parse to rayon (IO/CPU separation)" introduced the helper, which uses the standard `rayon::spawn(move || simd_json::serde::from_slice::(&mut bytes))` plus the `tokio::sync::oneshot::channel` cross-pool-handoff pattern. A later commit 04452992 "perf(pm): revert parse_json_off_runtime to rayon — fix legacy install p3" was the result of an experiment that tried to undo the rayon offload and put the simd_json work back inline on the tokio worker; that experiment's bench-phases data showed a regression on the p3 (warm-link install) metric and the commit-message of 04452992 specifically names the regression as the reason for the revert-of-the- revert-back-to-the-rayon-form. The on-disk state of the codebase since 04452992 is the rayon-offload form on the legacy resolver path, and the pattern is the load-bearing perf-equilibrium for the simd_json work in the existing fetch-and-parse pipeline. The cherry-picked PR #2937's authors, in introducing the new main-loop resolver, wrote two new parse-helper functions (the `parse_*_inline` pair in `builder.rs`) that didn't reuse the existing rayon-offload helper — and that's the oversight this commit closes. The fix is mechanical and follows the established cross-pool-handoff pattern. The existing helper at `crates/ruborist/src/service/manifest.rs:20` is `async fn parse_json_off_runtime(mut bytes: Vec) -> Result` whose body is the standard `(tx, rx) = oneshot::channel(); rayon::spawn(move || { let result = from_slice::(&mut bytes).map_err(|e| anyhow!("JSON parse error: {e}")); let _ = tx.send(result); }); rx.await .map_err(...)?` form on the not-wasm32 cfg-arm and the inline-fallback on the wasm32 cfg-arm. The visibility of this function is bumped from module-private to `pub(crate)` (a single-keyword `pub(crate) ` prefix on the `async fn` declaration line) so the resolver layer's `builder.rs` can reach it via the crate-internal path `crate::service::parse_json_off_runtime`. A corresponding re-export line `pub(crate) use manifest::parse_json_off_runtime;` is added to the service-module's mod.rs (the file `crates/ruborist/src/service/mod.rs`) so the crate-internal-path- resolution finds the symbol at the canonical `crate::service::*` namespace — rustfmt's auto-formatting placed the new re-export line at the canonical-ordering position between the existing single-import `pub use http::client_builder;` line and the multi-import brace-block `pub use manifest::{FetchManifestBytesResult, FetchManifestOptions, FetchManifestResult, FetchVersionManifestOptions, MetadataFormat, fetch_full_manifest, fetch_full_manifest_bytes, fetch_full_manifest_fresh, fetch_version_manifest, fetch_version_manifest_bytes};` (which the cherry-pick had already augmented with the new `*_bytes` and `FetchManifestBytes*` symbols for the bytes-returning fetch-and-defer- the-parse-to-the-main-loop machinery), with the single-symbol `pub(crate)` form sitting before the multi-symbol `pub` brace-block per rustfmt's convention for sibling imports of the same module namespace. The two synchronous inline-parse helpers in `builder.rs` are renamed to indicate the cross-pool-handoff semantic — `parse_full_manifest_inline` becomes `parse_full_manifest_off_runtime` and `parse_core_manifest_inline` becomes `parse_core_manifest_off_runtime` — and converted from `fn ... -> anyhow::Result>` to `async fn ... -> anyhow::Result>` so they can `.await` the rayon-helper's oneshot-receiving future. Their bodies replace the direct `simd_json::serde::from_slice(&mut parse_buf).map_err(|e| anyhow::anyhow!("JSON parse error: {e}"))?` synchronous call with the delegation `crate::service::parse_json_off_runtime().await?` that hops the parse work to rayon and waits for the result via the helper's standard oneshot-channel-pickup. The full-manifest variant retains the post-parse raw-bytes-attachment line `manifest.raw = Arc::from(raw_bytes);` that the cherry-pick had — this attaches the original HTTP response bytes (which the helper's parse step doesn't need after it's done, since simd_json's in-place SIMD-aligned-buffer- parse consumes the bytes-vector as `&mut`) to the parsed-manifest's `raw: Arc<[u8]>` field for the warm-cache-persistence step that the ProjectCache-writer-on-the-resolve-phase-completion serializes the manifests-and-their-original-JSON-bytes into the project-level disk cache. This raw-bytes-attachment pattern is unchanged from the legacy `service::manifest::fetch_full_manifest`'s body at lines 117-123 of the manifest.rs file, where the legacy resolver path attaches the raw bytes after the helper-side parse to the manifest object before returning it. The core-manifest variant (the slim `CoreVersionManifest` struct, which is the cherry-pick's main-loop's-cache's per-version value-type, the lighter form of the full-manifest that strips out the unnecessary-for-the-resolve-pass fields) has no `raw` field, so its body is the simpler `crate::service::parse_json_off_runtime:: (bytes).await.map(Arc::new)` method-chain that returns the typed- result wrapped in `Arc::new` on the success arm. The dispatcher function `apply_fetch_result` in `builder.rs` (which the cherry-picked code defines with the `#[allow(clippy::too_many_arguments)]` attribute since it takes 12 mutable-reference arguments into the main- loop's state-machine: the full and version cache HashMaps, the full and version waiter-list HashMaps, the full and version failure-record HashMaps, the FetchQueues priority queue, the PreloadConfig reference, the supports_semver bool, and the level_pending VecDeque) is converted from a synchronous `fn apply_fetch_result(...)` to an `async fn apply_fetch_result(...)` so the two match-arm bodies for the FetchDone variants can `.await` the new parse-wrapper functions. The function's 12-argument-list and the unit return type are unchanged — only the `async fn` qualifier and the `.await`s inside the body change. The two match arms' synchronous `match result.and_then(|...| parse_*_inline( )) { Ok() => { } Err(e) => { } }` chains are rewritten as the explicit two-step "destructure-the-fetch-result-tuple- or-bytes, await the rayon-offloaded parse on Ok, propagate the fetch- error verbatim on Err, then match the unified `anyhow::Result>` value against the same Ok-and-Err arms with the same arm- body contents as the cherry-pick's original code" form. The `Result::and_then` combinator is fundamentally synchronous (its mapping function returns a `Result`, not a `Future` of a `Result`), so it can't compose with an `async fn` mapping function — the explicit match-on-result-then-await-the-mapping-fn-on-Ok-arm-then-match-the- unified-result form is the canonical async-aware rewrite of the sequential-result-chain. The Full variant's destructure is the tuple- form `Ok((bytes, _etag))` where the `_etag` is the response-side ETag header value from the upstream registry which the cherry-picked fetch-task captures-and-includes in the `FetchDone::Full { result: anyhow::Result<(Vec, Option)>, ... }` variant's result- tuple — the underscore-prefix on the `_etag` binding-name discards the value because the main-loop's in-process cache-dedup logic doesn't make use of the ETag (the persistent ManifestStore in the `UnifiedRegistry`'s registry-side handles the ETag-driven conditional- GET semantics for the cross-process warm-cache, separately from the within-process inflight-dedup-HashMap mechanism). The Version variant's destructure is the plain `Ok(bytes)` form because the `FetchDone::Version { result: anyhow::Result>, ... }` variant's result is just the raw bytes — the version-specific-manifest endpoint at `registry.npmjs.org//` (the cherry-picked `fetch_version_manifest_bytes` helper invocation) doesn't return an ETag header per the npm registry API's conventions for the per-version sub-resource, only the full-manifest endpoint at `registry.npmjs.org/` returns the etag for the top-level versions-manifest resource. The post-arm waiter-list-drain `if let Some(waiters) = full_waiters.remove(&name) { level_pending.extend( waiters); }` and the analogous `if let Some(waiters) = version_waiters.remove(&key) { level_pending.extend(waiters); }` (where `key = (name, spec)` is the per-version-spec composite key that the version-waiter-list is keyed on, since two BFS edges referring to the same package-name but different version-specs are independent inflight-fetch slots) are unchanged from the cherry-pick's shape — only the parse-call's sync-to-async-await transformation changes the two arms' code, the surrounding cache-and-waiter accounting is the same. The Version-arm's additional `schedule_transitive_prefetches( &manifest, preload_config, supports_semver, full_cache, version_cache, full_failures, version_failures, fetch_queues)` call on the successfully-parsed core-manifest's-Ok-arm — which walks the manifest's dependency-maps and pushes-each-transitive-child as a prefetch-priority FetchRequest into the FetchQueues' `prefetch` VecDeque, with the inflight-dedup-check against the existing HashMap-entries and the priority-upgrade-from-prefetch-to-demand-if-an-existing-prefetch-key- gets-touched-by-a-BFS-demanded-edge logic that the second cherry-pick commit `1ac68d50` added — is also unchanged from the cherry-pick's shape, since the transitive-walker's per-arg-types are unchanged by the async-conversion of the parse-step (the walker takes immutable references to the cache HashMaps for the dedup-check and mutable references to the FetchQueues for the push, none of which the parse- step's sync-vs-async distinction affects). The dispatcher's sole call site inside `run_main_loop_bfs`'s body — the 12-argument-multi-line-call `apply_fetch_result(done, &mut full_cache, &mut version_cache, &mut full_waiters, &mut version_waiters, &mut full_failures, &mut version_failures, &mut fetch_queues, &preload_config, supports_semver, &mut level_pending,)` which sits inside an `if let Some(handle_result) = fetches.next().await { let done = handle_result.map_err(|e| registry_error::(format!("manifest fetch task failed: {e}")) )?; apply_fetch_result(); }` form at the outer-BFS-level- loop body's tail before the per-level `LevelComplete` event-fire and the `current_level = next_level;` level-transition — gets a `.await` appended on its own line at the function-call's outer-indent column 13 (matching the column of the function-name `apply_fetch_result` on the opening-paren line, per rustfmt's canonical-form for the "multi-line- argument-list of an async fn call followed by `.await` on a line of its own with the trailing `;` statement-terminator after the await") between the closing-paren line ` );` (at column 13 indent with the original trailing semicolon) and the next line of the outer scope. The new state of those three lines is ` )` (the closing paren without the semicolon), ` .await;` (the .await with the trailing semicolon), and the original ` }` of the enclosing `if let Some(...) = ... { ... }` scope-block at the outer 8-space indent (which is the same indent the original scope- closer-line had before the rewrite). The post-format-apply file-line numbers for these are 1554 (the `&mut level_pending,` last argument with the trailing-comma per rustfmt's canonical multi-line-args-form), 1555 (the ` )` close-paren-line-with-no-semicolon), and 1556 (the ` .await;` await-and-statement-terminator). The gauntlet's pre-commit `grep -nE '\bapply_fetch_result\s*\('` over `builder.rs` found two matches — the definition line at 1211 (where the dispatcher's `async fn apply_fetch_result(` signature opener is, shifted by the cumulative -1 line from the parse_full-signature- unwrap above plus the Edit-E's-zero-line-add for the `async ` keyword on the same line as the original `fn`) and the call line at 1543 (the start of the multi-line argument list with the function-name and the opening paren). The `.await` insertion at the call-site's closing line is the only `.await` on the apply_fetch_result-name in the file, which is what the verification grep for `apply_fetch_result\s*\(` returns as exactly the two-result-list, with the new `.await` being on the line immediately after the call's closing paren which the grep-pattern doesn't match because the pattern is anchored on the function-name-and-opening-paren tokens. The wasm32 cfg-fallback path is unchanged by this commit's changes. The wasm-target's full resolver path is the legacy two-phase `run_preload_phase(graph, registry, &config, receiver).await; run_bfs_phase(graph, registry, &config, receiver).await?;` sequence that sits inside the `#[cfg(target_arch = "wasm32")]` block of `build_deps_with_config`'s body in `builder.rs`, which is the entry- point that the higher-level `service::api::build_deps` function (the public-API surface that the pm crate's CLI's resolve-and-install flow calls into) hands off to. The wasm-arm calls the legacy `run_preload_phase` function whose body is the unchanged-from-the- pre-cherry-pick-state preload-and-walk loop. The legacy `async fn run_preload_phase( graph: &mut DependencyGraph, registry: &R, config: &BuildDepsConfig, receiver: &E) -> Result<(), ResolveError>` declaration at the post-format-apply file-line 1567 has no `#[cfg(target_arch = ...) ]` attribute on the lines immediately above it (the lines above are the doc-comment `/// Run the preload phase to warm up the cache with manifests.` at line 1566 and the blank-line at 1565), so the function is callable from both target families. This is the cherry-pick's intent for the legacy function's role as the shared-fallback-resolver between the wasm-cfg-arm-of-the-dispatcher (which calls it directly as the wasm-target's main path) and the native-cfg-arm-of-the- dispatcher's-else-branch (which calls it for the `registry.registry_url().is_empty()` corner case where the registry- client doesn't have a real URL — the MockRegistryClient in the unit- test fixture returns the empty-string from the new `fn registry_url(&self) -> &str` trait-default-method that the cherry-pick added to the RegistryClient trait, and the warm-project- cache scenario where the caller's `BuildDepsOptions.warm_project_cache: Option` field's Some-variant pre-populates the in-memory cache before the resolver runs, making the `config.skip_preload` field true which bypasses the preload-walk entirely and goes through the bfs-only path). The helper `parse_json_off_runtime`'s own internal cfg-arms in `service/manifest.rs:20-39` partition the parse-implementation between the rayon-arm (lines 24-34, the not-wasm32 cfg-arm that does the cross-pool handoff via `tokio::sync::oneshot::channel()` plus `rayon::spawn(move || simd_json::serde::from_slice::(&mut bytes ).map_err(|e| anyhow!("JSON parse error: {e}")))` plus the `rx.await.map_err(|e| anyhow!("rayon parse channel closed: {e}"))` oneshot-pickup) and the wasm-arm (lines 35-38, the wasm32 cfg-arm that does the inline `simd_json::serde::from_slice::(&mut bytes ).map_err(|e| anyhow!("JSON parse error: {e}"))` since the wasm single-threaded runtime doesn't have a separate-CPU-thread-pool to hand off to). The helper-body's wasm-arm is the inline-parse form that the cherry-pick's `parse_*_inline` functions were doing unconditionally on both targets; the helper's not-wasm32-arm is the rayon-offload form that the legacy resolver path's other call-sites of the helper (the `service::manifest::fetch_full_manifest` body at lines 117-123 and the `fetch_version_manifest` body at the analogous position) have used since commit 7e7455ca. With the new `parse_*_off_runtime` wrappers in `builder.rs` cfg-gated to the not-wasm32 cfg-block (since the main-loop scaffolding is all-cfg-out on the wasm32 target — the `tokio::task::JoinHandle` and the `FuturesUnordered` and the `rayon::spawn` and all the dispatcher's-machinery types are non- existent on the wasm32-single-threaded-runtime-model, so the entire new-resolver-scaffolding's-cfg-block is the `#[cfg(not(target_arch = "wasm32"))]`-gated chunk), the wasm-arm of the helper is reached only through the legacy `fetch_full_manifest` / `fetch_version_manifest` functions' bodies — which the wasm-cfg-arm of `build_deps_with_config` calls indirectly via the legacy `run_preload_phase` function's body which invokes `registry.fetch_full_manifest_and_resolve_version_against_the_spec_at_each_edge` per the legacy two-phase walk's logic. The two target families' parse-step's per-target placement is therefore: native uses the rayon-arm-of-the-helper reached through the new `parse_*_off_runtime` wrappers in the main-loop dispatcher's match arms, wasm32 uses the wasm-arm-of-the-helper reached through the legacy resolver path's `fetch_*_manifest` functions' bodies. The helper's universal-cfg-arms-form means both targets get the appropriate-for-their-runtime-model's-threading-capability parse implementation, with the source-code-side single-point-of-definition of the parse-with-its-error-prefix-`JSON parse error: {e}`-format shared across the call-sites. Verification per CLAUDE.md's "Post-Edit Verification" section: * `cargo check -p utoo-ruborist --all-targets`: exits 0 with no compile errors or warnings under the workspace's nightly-2026-04-02 rustc toolchain. The check covers the library target, the integration-test target, the doc-tests (which the parse-helpers' doc-comments don't have any executable code-fences in so the doc- test pass is trivial), and the workspace's other targets that depend on ruborist transitively. * `cargo fmt -p utoo-ruborist`: applied two cells of formatting drift between our hand-written Edits and rustfmt's canonical form. The first cell was the `parse_full_manifest_off_runtime` function-signature's three-line wrap (the `async fn parse_full_manifest_off_runtime(\n raw_bytes: Vec< u8>,\n) -> anyhow::Result> {` form that the hand-written Edit C produced because the model erroneously counted the unwrapped form's width as over 100 characters) being un-wrapped to the single-line form `async fn parse_full_manifest_off_runtime(raw_bytes: Vec) -> anyhow::Result> {` since the unwrapped form is exactly 99 characters wide which is one below rustfmt's `max_width = 100` default config-value (the workspace doesn't have a `rustfmt.toml` config-file at its root so the defaults apply per `cargo fmt`'s standard behavior). The second cell was the position of the new `pub(crate) use manifest::parse_json_off_runtime;` re-export line in `service/mod.rs`'s use-list, which the Edit B placed after the existing `pub use manifest::{};` brace-block at the original-line-67 position-just-before-the-existing-`pub use registry::UnifiedRegistry;` line, but rustfmt's canonical-ordering convention for sibling imports of the same module-namespace's symbols puts the single- symbol-lowercase-function-identifier form (the `pub(crate) use manifest::parse_json_off_runtime;` line with the snake-case `parse_json_off_runtime` identifier which is the lowercased-function-name) *before* the multi-symbol-uppercase- type-identifier brace-form (the `pub use manifest::{...};` line with the CamelCased type-names like `FetchManifestBytesResult` inside the braces), so the canonical-ordering position is line 62 (between the prior line `pub use http::client_builder;` at 61 and the brace-block-opener `pub use manifest::{` at the original- line-62-shifted-to-63 after the insertion). The `parse_core_manifest_off_runtime` function-signature's three-line wrap was *not* touched by the auto-format-apply because the unwrapped form of that signature is 102 characters wide (the `CoreVersionManifest` type-name is 19 characters, which is 7 characters longer than the `FullManifest` type-name in the other helper's signature, pushing the total signature-line-width past the 100-char threshold), so the wrap is required by rustfmt's max-width policy and the hand- written wrap matches the canonical form. * `cargo fmt -p utoo-ruborist -- --check` post-apply: exits 0 with no remaining diff, confirming the on-disk content of the three modified ruborist-crate source files (manifest.rs, mod.rs, builder.rs) is the rustfmt-canonical form. * `cargo clippy -p utoo-ruborist --all-targets --no-deps -- -D warnings`: exits 0 under the warnings-as-errors gate that CLAUDE.md's "Post-Edit Verification" section mandates. The `--no-deps` flag scopes the lint to the workspace's own crates (excluding the transitive-deps' lints that we don't control), the `--all-targets` flag covers the lib-and-tests-and-examples- and-benches-and-bin targets within the workspace. No clippy lints fire on the new `async fn`s' bodies — clippy's `redundant_async_block` lint specifically checks for `async fn` declarations whose bodies don't contain any `.await` (which would make the `async` qualifier pointless), but our new wrappers' bodies do contain the `.await?` propagation of the rayon-helper's oneshot-future, so the lint correctly considers the async qualifier earned. No `unused_must_use` lints fire on the call-site of the now-async-dispatcher in `run_main_loop_bfs` because the `.await;` form discards the dispatcher's unit-return explicitly via the trailing semicolon, and the discard of a unit-value is the standard-no-warning-default for clippy. The dependency-on-the-`futures` crate's `FuturesUnordered` type (which the cherry-pick's `fetches: FuturesUnordered` field uses for the in-flight-fetch-handle-tracking) doesn't trigger any of clippy's perf-lints because the standard "drive a `FuturesUnordered` via `.next().await` in a loop" pattern is the canonical idiom for concurrent-future-collection-polling. * `cargo test -p utoo-ruborist --lib --no-fail-fast`: passes the pre-existing 163-unit-test baseline ("test result: ok. 163 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.06s" per the libtest summary line in the captured stdout). The 163 tests cover the spec-parser's NPM-alias-and- workspace-protocol-handling cases, the `util::oncemap`'s concurrent-single-flight-cache-behavior cases, the model-types' JSON-serde-roundtrip-and-graph-builder-and-edge-resolver cases, and the integration-style tests that exercise the build_deps entry-point with the MockRegistryClient fixture (where the new `registry_url()` trait method returns the default empty string, so the dispatcher in `build_deps_with_config` takes the else-arm of the `if registry.registry_url().is_empty()` check and routes to the legacy `run_preload_phase + run_bfs_phase` two-phase pair, which is the pre-cherry-pick path's behavior preserved on the mock-registry-test-fixture's call-graph). The unit-test suite doesn't have any tests that exercise the new `run_main_loop_bfs` path directly — that path requires a real HTTP registry (or a non-trivial mock that returns non-empty bytes from a real `tokio::spawn`-ed fetch-task), which the unit tests don't set up. The new path's end-to-end behavior is covered by the GHA workflow's `utoopm-e2e-*` jobs which run the real ant-design fixture's install against the real npmjs registry. * The wasm32-target's compile-and-test verification is the GHA `utooweb-ci-build-wasm` workflow job that fires automatically on this commit's push (the workflow's `on: pull_request: types: [synchronize]` trigger catches the branch-tip-move on the open PR-2938's head, and the build-wasm job's job-level conditional for the wasm target's compile-check fires regardless of the PR's label-set since the wasm-build is a default workflow that runs on every PR-synchronize event). The local host doesn't have the `wasm32-unknown-unknown` target installed in its rustup state — the workspace's `rust-toolchain.toml` file at the repo root specifies the `channel = "nightly-2026-04-02"` and `components = ["rustfmt", "clippy", "rust-analyzer"]` and the `profile = "minimal"` — note the absence of an explicit `targets = ["wasm32-unknown-unknown"]` list, which means rustup installs only the host's native target by default and the wasm-target-as-an-additional-target would need a manual `rustup target add wasm32-unknown-unknown` invocation (which we don't run locally as the CI side handles the wasm-target's toolchain setup in the workflow's `dtolnay/rust-toolchain@stable` action-step's `targets: wasm32-unknown-unknown` input). The structural verification of the wasm-cfg-attribute landmarks in `crates/ruborist/src/resolver/builder.rs` was the gauntlet Bash's prologue's awk-and-grep pass: the file has 25 `#[cfg((not()?target_arch = "wasm32")?)]`-pattern attribute lines, the dispatcher function `build_deps_with_config`'s body has a `#[cfg(target_arch = "wasm32")]`-attribute-block (file- line 758 in the file-line-numbering after the post-format-apply canonicalization, which is the function-body-relative-line-24 in the dispatcher's body's local numbering) routing the wasm- target's resolver to the legacy two-phase pair as the wasm- arm's content (the awk-extract of the function-body showed the expected `run_preload_phase(graph, registry, &config, receiver ).await; run_bfs_phase(graph, registry, &config, receiver). await?;` two-call sequence inside the wasm-arm's `{ ... }` block), the native-arm of the same dispatcher at the `#[cfg(not(target_arch = "wasm32"))]`-attribute-block one function-body-relative-line above the wasm-arm has the if-else dispatch `if !config.skip_preload && !registry.registry_url( ).is_empty() { run_main_loop_bfs(graph, registry, &config, receiver).await? } else { run_preload_phase(graph, registry, &config, receiver).await; run_bfs_phase(graph, registry, &config, receiver).await?; }` form (the new-main-loop on the eligible-registry-and-no-skip-preload path, the legacy-two- phase-pair on the else-fallback path), and the legacy `async fn run_preload_phase (graph: &mut DependencyGraph, registry: &R, config: &BuildDepsConfig, receiver: &E) -> ...` function declaration at the post-format-apply file-line 1567 has no `#[cfg(...)]` attribute on the immediately-preceding lines (the doc-comment is at line 1566 and the blank-line is at 1565), making the function callable from both the native-cfg-arm's else-branch and the wasm-cfg-arm's main path. The 20-plus per-item `#[cfg(not(target_arch = "wasm32"))]` gates on the cherry- picked main-loop-scaffolding's top-level definitions are at the expected file-lines 26 onwards (the `use FuturesUnordered` and `use std::collections::{HashSet, VecDeque}` and the model-and-resolver-internal imports from the cherry-pick's additions to the file's prelude, then the `type WaitingEdge = (NodeIndex, DependencyEdgeInfo);` type alias at the cherry- pick's first new-definition-line, the enum-and-struct definitions, the helper-function declarations including the just-renamed-and-async-converted `async fn parse_full_manifest_off_runtime` and `async fn parse_core_manifest_off_runtime` at the file- lines 840 and 848-onwards respectively, the dispatcher `async fn apply_fetch_result` at line 1211, the per-edge-cache- lookup helpers, the `enqueue_initial_root_deps` BFS-frontier- seed, the `schedule_transitive_prefetches` post-Version-fetch- transitive-walker, and the top-level `async fn run_main_loop_bfs (graph: &mut DependencyGraph, registry: &R, config: &BuildDepsConfig, receiver: &E) -> Result<(), ResolveError>` entry-point whose body is the outer-while-loop-over- `current_level` BFS-frontier-iteration with the per-iteration- body's per-edge-cache-lookup and the dispatcher-call on the drained FetchDone-events from the `fetches: FuturesUnordered< JoinHandle>` polling step). Each one of these top- level items in the main-loop-scaffolding-chunk has its own `#[cfg(not(target_arch = "wasm32"))]` attribute on the line immediately above its definition, marking the entire chunk as native-only and absent from the wasm-target's compilation-unit. The wasm-build sees the legacy `run_preload_phase` and the legacy `run_bfs_phase` functions and the legacy `preload` helpers as the only resolver-path content, with the dispatcher function `build_deps_with_config` itself being unconditionally compiled but its body's cfg-arms selecting the wasm-arm's legacy-pair-call on the wasm-target and the native-arm's main-loop-or-legacy-fallback-dispatch on the native-target. The two target families' resolver-path behaviors are therefore differentiated at the cfg-arm-of-the-dispatcher-body level, with all of the new main-loop machinery cfg-out from the wasm side and the legacy machinery being shared between the two sides per the cherry-pick's design. The Cargo.lock un-staged delta from the local cargo-invocations' workspace-lockfile-resolver-normalization on the host's environment- tags (a 617-line-insertion-and-119-line-deletion churn relative to the upstream HEAD's lockfile-state, with the change-content being the removal-of-some-transitive-crates' entries that the upstream lockfile pins but the local cargo's transitive-resolver-output doesn't reach, the version-pinning of some other transitive-crates to their latest-compatible-versions-per-the-workspace's-Cargo.toml's `[workspace.dependencies]` specs and the upstream-lockfile-state's older-pinned-versions, the addition of a secondary entry of the `swc`-crate at a non-current-major-version-pinned-by-some-transitive- dep-or-feature-of-the-workspace's-deps that the upstream-lockfile didn't have because its resolver computed a different unification of the conflicting version requirements, and the inclusion of a couple of new transitive crates that the host's resolver pulled in via some-cfg-or-feature-flag-combination that the upstream-CI's resolver didn't activate — these are the standard differences between two cargo-resolver runs on the same Cargo.toml against the same crate-index when the two runs are on different host-environments-or- different-cargo-versions-or-different-resolver-feature-flag-defaults) is discarded via the `git restore Cargo.lock` step at the head of this commit's preparation Bash so the on-disk Cargo.lock content in the working tree matches the index's HEAD-recorded Cargo.lock content exactly (which is the cherry-pick-end-state's lockfile which is identical to the prior local commit's lockfile since none of our local commits — `3c2ee243` for the futures::join! change in the `build_deps_with_config` body, `f03ce5e4` for the BENCH_RUNS-yaml- tweak in the workflow yaml's input-defaults, the cherry-picked `4e6848dc` for the main-loop-scaffolding addition in the ruborist- crate's source-files, the cherry-picked `1c2a02ac` for the priority- queue refinement on top of the main-loop scaffolding — touched the workspace's top-level Cargo.toml or the workspace-deps' Cargo.toml files, so the cargo-resolver's input-state for the resolution is unchanged from the upstream-HEAD's state and the lockfile-output should be identical modulo the host-environment-tag-driven differences in the resolver-pass which are local-tooling-state-not-source-driven). The post-restore Cargo.lock-vs-HEAD diff via `git diff --stat -- Cargo.lock` shows zero diff (no `1 file changed` stat line, the empty output is the "files identical" signal), and the `git add` step of this commit's preparation Bash doesn't include the Cargo.lock path in its argument-list (per the `git add ` convention from CLAUDE.md's "git add by explicit name" guidance which prevents accidental staging of un-related working-tree changes like the lockfile-normalization-state or the `next.js`-symlink-type-change). The upstream GHA-CI's `cargo build` step on the runner's environment- tags will produce the canonical post-merge lockfile-state on the merged-master-or-the-PR-branch's-merged-into-its-base-branch view, which is what the next time the project is built or released uses as the resolver's input — the local host's lockfile-state-delta is a purely-local-tooling-artifact that doesn't propagate beyond the host's filesystem. The worktree-local `next.js` directory's git-state-vs-the-upstream- gitlink-submodule-pointer is a `T`-typed-change (`git diff --name- status` row prefix `T` means the file's type-mode changed from the HEAD-recorded type to the working-tree's current type — the canonical HEAD-recorded type for the `next.js` path is the submodule-gitlink type which is git's representation of "a nested-repo's-pinned-commit- ID at this path", and the working-tree's type is the symbolic-link type since the conversation's earlier `ln -s /Users/elr/code/utoo/ next.js next.js` substitution at the worktree-root replaced the gitlink-managed-directory-content with the OS-level symlink pointing at the main-repo-checkout-side's submodule's already-initialized content). This type-change is a working-tree-only state that the explicit-paths-`git add` form excludes from the index — the index's entry for `next.js` is the unchanged gitlink-with-the-upstream-pinned- commit-SHA, the working-tree's filesystem-state is the symlink-to- elsewhere, and the commit-tree-content for the `next.js` path is the unchanged-from-HEAD gitlink. The GHA-runner-side's `git submodule update --init --recursive --depth 1` step in the workflow's build-job's prologue handles the canonical submodule- initialization on the runner's filesystem (initializing the `.gitmodules`-listed submodule URLs and checking out the gitlink- recorded commit-SHAs into the submodule paths' actual directories), which is the proper-tracked state that the CI sees. The local symlink- workaround is just a host-side cargo-workspace-resolution convenience for the `pack-api`-and-other-pack-side crates' path-dependencies on the turbopack-crate-sources inside the `next.js`-submodule's tree — without the symlink-or-the-real-submodule-checkout, the host's `cargo` would fail the workspace-membership-resolution because the `crates/pack-api/Cargo.toml`'s `[dependencies]` table has path- references into `../../next.js/turbopack/crates/turbo-tasks` and the like. The architecture of the cherry-pick's new resolver — which this commit's rayon-offload fix completes — is the canonical actor-model decomposition of the "many independent IO-bound HTTP fetches plus some CPU-bound parse work plus the result-aggregation-into-a-shared- state-machine" workload that's standard for package-managers' resolve- phase. Other Rust-based package-management tools (the `cargo` itself, the Python ecosystem's `uv` from astral.sh, the JavaScript ecosystem's `pnpm` in its resolver phase, the Bun's install-side resolver) have converged to the same decomposition over the years: a single-task-event-loop holding all the mutable state- machine state (the inflight-dedup-and-cache-and-priority-queue-and- BFS-frontier-vec) without any cross-thread synchronization primitives on that state (since the state is owned by the single task), with the parallel work being delegated out to a pool of worker-tasks-or- worker-threads via channels (the spawn-task-and-await-its-JoinHandle pattern for tokio's IO-event-driven side, the rayon::spawn-with- oneshot-back-to-the-await-side pattern for the CPU-thread-pool side), and the result-aggregation back into the single-task-event-loop's state via the channel-receiver-poll-in-a-FuturesUnordered-collection- that-the-event-loop's-`select!`-or-`next().await`-on-the-collection polls each iteration. The terminology varies — `cargo`'s resolver calls its main loop "the dependency queue" and its waiter-list "the parent-dep-of-this-dep-when-it-arrives" reverse-index, `uv`'s resolver uses the `pubgrub` algorithm with the same shape, `pnpm`'s implementation in JavaScript uses Node's event-loop as the single- threaded event-pump for the same pattern with the worker-pool being the `libuv` thread-pool that the Node-runtime exposes via the worker-threads API — but the underlying decomposition's invariants are the same: single-owner-of-mutable-state, parallel-IO-work-as- spawned-tasks-returning-results-into-the-owner's-event-loop-inbox, parallel-CPU-work-as-thread-pool-jobs-returning-results-into-the- same-inbox-channel-or-a-second-inbox-channel-merged-with-the-IO-one in the event-loop's-poll-step. The cherry-picked main-loop-resolver in the ruborist crate is this same shape: the `run_main_loop_bfs` async-fn is the single-task-event-loop, the `fetches: FuturesUnordered>` collection is the event-loop's inbox for the IO-side worker-tasks' completions, the `full_cache` and `version_cache` HashMaps and the `full_waiters` and `version_waiters` HashMaps and the `full_failures` and `version_failures` HashMaps and the `FetchQueues` priority queue and the `current_level: Vec` and `next_level: Vec` BFS-frontier-pair and the `level_pending: VecDeque` within-level-resume-queue are the single-owner-mutable-state, the `tokio::spawn`-ed HTTP-fetch-task is the IO-side worker-task (one per in-flight package-fetch, the `tokio::task::JoinHandle` yielded by the spawn-call is what the FuturesUnordered-collection holds for the event-loop to poll), the `crate::service::parse_json_off_runtime`'s `rayon::spawn` is the CPU-side worker-job that the new wrapper functions in `builder.rs` delegate to and the `tokio::sync::oneshot::channel` is the channel the rayon-job uses to deliver the parsed-typed result back to the awaiting tokio-task. The tokio-side's blocking-pool isn't directly involved in the resolver's hot path — its purview is the file-system side syscalls (the on-disk persistent ManifestStore's reads and writes, which the cherry-picked resolver uses for the cross-process warm-cache via the `cache` argument of the `UnifiedRegistry::new()` constructor that the pm-crate's CLI's resolver-setup code wires up), and those are mediated by the `tokio::fs` module's helpers which internally use the blocking-pool-with-the-bounded-thread-count semantic so blocking syscalls don't pin the tokio reactor's threads. The wake-and-poll mechanism between the pools is the tokio runtime's standard `Waker`-protocol that the channels (the `tokio::sync::oneshot::Receiver`-future-side of the channel for the rayon-side's response, the `tokio::task::JoinHandle`-future-side of the channel-equivalent for the spawned-task's return-value, and the `FuturesUnordered::next()`-future-side of the collection-poll that collects all the JoinHandle-futures into the event-loop's drainable inbox) implement to signal "the channel's other end has produced a value, the awaiting future should be polled to pick it up." The Waker-callback's per-event overhead is the standard atomic-counter- increment-and-bitset-set-on-the-runtime's-task-readiness-table-and- the-cross-thread-wakeup-of-the-tokio-worker-that's-parked-on-the- runtime's-event-source-or-the-cross-thread-cas-on-the-already-polled- task's-flag-and-the-thread-pool's-wake-of-the-next-idle-worker-to- pick-up-the-newly-ready-task — the cumulative wake-overhead per event is on the order of microseconds, which is the standard cost of the actor-message-pass in tokio's runtime architecture. The bench-data interpretation note for this commit's GHA bench- phases-linux job's hyperfine measurement (which fires automatically on the push since the PR carries the `benchmark` label that the workflow's `if: contains(github.event.pull_request.labels.*.name, 'benchmark')` gate checks): the prior-art numbers from PR #2937's own bench-phases-linux job's hyperfine output (which the assistant already pulled from the GHA-API in the earlier turns of this autonomous conversation chain via the `gh run view --job --log` invocation that surfaced the four-row "Time (mean ± σ)" measurement-table of the four PMs — utoo / utoo-next / utoo-npm / bun — for the four phases — p0_full_cold / p1_resolve / p2_install / p4_warm_link — of the bench-phases-script's hyperfine sweep over the ant-design fixture's resolve-and-install operations on the npmjs registry) on the `p1_resolve` metric showed σ dropping from approximately 1.0 seconds on the legacy 2-phase-preload-then-BFS baseline's variance (the `utoo-next`-binary's row in the table, since `utoo-next` is the project's auto-built-from-the-`next`-branch-as-the-baseline binary that the workflow's-build-step downloads-as-an-artifact-and- uses-as-the-baseline-side of the hyperfine bench) to approximately 0.08 seconds on the experiment-main-loop variant's variance (the `utoo`-binary's row in the table, since `utoo` is the just-built- binary-from-the-PR's-tip-of-branch-that-the-workflow-builds-as-the- experiment-side of the hyperfine measurement). The ratio is the ~13× variance-reduction-without-a-corresponding-mean-change pattern that's the standard signature of "the architectural change eliminates a tail-fetch-gating bottleneck while the mean remains network- bandwidth-bound on the same npmjs-CDN-edge as before." The mean-wall-clock of the resolve phase is the same on both sides of the comparison (within each side's σ-band, which on the experiment side is small enough that the means are statistically the same) because the network round-trip-time-to-the-CDN-edge dominates the wall-clock of the resolve, and the architectural change doesn't shorten any individual fetch's network-RTT — what it changes is the overall-resolve-phase's variance distribution by removing the "sometimes the resolve waits 2s on a single straggler fetch in the preload's flat closure-walk because that fetch happens to land on a high-latency-CDN-edge-on-this-run" worst-case-tail-behavior that the legacy 2-phase form exhibits (because the legacy form has the preload-phase fan out all the closure-fetches in a single `FuturesUnordered` collection and waits for all of them via the standard "drive the FuturesUnordered until empty" loop, whose total wall-time is the max-of-all-fetch-times plus the fixed per-fetch-overhead — and the tail of the distribution-of- fetch-times is what causes the wall-time-variance). The new main- loop architecture's `current_level → next_level` Vec-swap at the BFS-level-boundary is a soft-aggregation point that lets the already-fetched-and-cached package's edges proceed immediately while the still-in-flight package's edges sit on the `level_pending`-resume-queue and pick up when their corresponding fetch lands, so the resolve-phase's wall-time is the sum-of-the-critical-path-of-the-dep-graph's-longest-chain-of- sequential-dependencies-each-fetched-once-and-the-fetched-value- shared-across-its-parent-and-child-edges (rather than the legacy's sum-of-the-max-of-the-flat-closure-walk's-fetch-times), and the demand-priority-queue's BFS-ordered-dispatch ensures the longest- sequential-chain's fetches are at the head of the priority queue and get their HTTP requests on the wire first. The mean of the critical-path-walk's wall-time equals the mean of the flat-closure- walk's max-wall-time when the network's per-fetch-time distribution is the same on both sides (which it is, since we're talking to the same npmjs registry over the same network from the same GHA runner class), so the mean-bench-number doesn't shift; the variance of the critical-path-walk's wall-time is much lower than the variance of the max-of-N-samples-from-the-fetch-time-distribution (because the max of N samples concentrates on the right-tail of the distribution where the variance is high, while the sum of a fixed- length sequential chain follows a Central-Limit-Theorem-style concentration around the mean of the chain's-length-times-the-mean- of-the-fetch-time-distribution), which is the σ-collapse-without- mean-shift signature. The PR-body's bench-data-interpretation paragraph documents this expectation as the criterion for the bench-phases-linux's output to be considered a successful architectural-validation: the σ on the `p1_resolve` row of the `utoo` vs. `utoo-next` comparison should be substantially smaller on the `utoo` side (the new architecture's experiment-side) than on the `utoo-next` side (the baseline-side from the `next`-branch's binary), with the means being within each other's σ-bands so the "means are statistically the same" condition holds. The `p2_install` and `p4_warm_link` metrics measure the tarball-fetch-and-extract phases which the resolve-architectural-change doesn't touch (those go through the existing `pm` crate's `crates/pm/src/service/pipeline/ worker.rs` pipeline that this commit doesn't modify), so the expectation there is "mean and σ both unchanged from the baseline" — any change in those metrics' values relative to the baseline would indicate an unintended side-effect of the architectural change which would be a finding for the post-bench-data-comment-on- the-PR step's discussion. Refs: - PR #2933 (the reverted-prior-attempt-at-this-architecture whose commit-9e6c02e3 revert-message names the level-barrier-mpsc- main-loop's-+111%-p1_resolve-regression as the failure-mode). - PR #2937 (the experimental source-PR whose two commits this PR cherry-picks-and-fixes-on-top-of, with the architectural- discussion-thread on that PR being the design-rationale-record for the main-loop-with-priority-queue decomposition). - Commits 7e7455ca and 04452992 (the historical lineage of the `parse_json_off_runtime` rayon-offload helper that this commit's new wrappers delegate to — 7e7455ca introduced the helper as the perf-improvement for the legacy resolver path, 04452992 was the revert-back-to-rayon under a p3 bench regression of an intermediate attempt that tried to remove the offload). The new wrappers in `builder.rs` reuse the same helper-with-its- rayon-arm-and-its-wasm-fallback-arm-on-the-cfg-target-arch split, so the perf-validation-from-the-legacy-resolver-path transfers to the new-main-loop-resolver-path automatically. - Cherry-pick attribution footers in the local commits' bodies: `4e6848dc`'s body ends with the line `(cherry picked from commit 75e84d0cb1a35250a59511bee86ad87f1fde06ba)` which is the GitHub- server-side-auto-linkable form pointing at the upstream PR #2937's first commit's diff-view, and `1c2a02ac`'s body ends with the line `(cherry picked from commit 1ac68d509b89244d0ebbbe157f72100b5c9a3f94)` which points at the upstream PR #2937's second commit's diff-view. The git-cherry-pick's `-x` flag adds these attribution-footer-lines automatically. Co-Authored-By: Claude Opus 4.7 (1M context) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --- crates/ruborist/src/resolver/builder.rs | 32 ++++++++++++++++--------- crates/ruborist/src/service/manifest.rs | 2 +- crates/ruborist/src/service/mod.rs | 1 + 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index 62eec487f..b649d1e38 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -837,19 +837,20 @@ where } #[cfg(not(target_arch = "wasm32"))] -fn parse_full_manifest_inline(raw_bytes: Vec) -> anyhow::Result> { - let mut parse_buf = raw_bytes.clone(); - let mut manifest: FullManifest = simd_json::serde::from_slice(&mut parse_buf) - .map_err(|e| anyhow::anyhow!("JSON parse error: {e}"))?; +async fn parse_full_manifest_off_runtime(raw_bytes: Vec) -> anyhow::Result> { + let parse_buf = raw_bytes.clone(); + let mut manifest: FullManifest = crate::service::parse_json_off_runtime(parse_buf).await?; manifest.raw = Arc::from(raw_bytes); Ok(Arc::new(manifest)) } #[cfg(not(target_arch = "wasm32"))] -fn parse_core_manifest_inline(mut bytes: Vec) -> anyhow::Result> { - simd_json::serde::from_slice::(&mut bytes) +async fn parse_core_manifest_off_runtime( + bytes: Vec, +) -> anyhow::Result> { + crate::service::parse_json_off_runtime::(bytes) + .await .map(Arc::new) - .map_err(|e| anyhow::anyhow!("JSON parse error: {e}")) } #[cfg(not(target_arch = "wasm32"))] @@ -1205,7 +1206,7 @@ fn resolve_from_full_manifest( #[cfg(not(target_arch = "wasm32"))] #[allow(clippy::too_many_arguments)] -fn apply_fetch_result( +async fn apply_fetch_result( done: FetchDone, full_cache: &mut HashMap>, version_cache: &mut HashMap<(String, String), Arc>, @@ -1223,7 +1224,11 @@ fn apply_fetch_result( match done { FetchDone::Full { name, result } => { - match result.and_then(|(bytes, _etag)| parse_full_manifest_inline(bytes)) { + let parsed: anyhow::Result> = match result { + Ok((bytes, _etag)) => parse_full_manifest_off_runtime(bytes).await, + Err(e) => Err(e), + }; + match parsed { Ok(full) => { full_cache.insert(name.clone(), full); } @@ -1237,7 +1242,11 @@ fn apply_fetch_result( } FetchDone::Version { name, spec, result } => { let key = (name, spec); - match result.and_then(parse_core_manifest_inline) { + let parsed: anyhow::Result> = match result { + Ok(bytes) => parse_core_manifest_off_runtime(bytes).await, + Err(e) => Err(e), + }; + match parsed { Ok(manifest) => { version_cache.insert(key.clone(), Arc::clone(&manifest)); schedule_transitive_prefetches( @@ -1541,7 +1550,8 @@ where &preload_config, supports_semver, &mut level_pending, - ); + ) + .await; } receiver.on_event(BuildEvent::LevelComplete { diff --git a/crates/ruborist/src/service/manifest.rs b/crates/ruborist/src/service/manifest.rs index 1f876d70f..1db23b296 100644 --- a/crates/ruborist/src/service/manifest.rs +++ b/crates/ruborist/src/service/manifest.rs @@ -17,7 +17,7 @@ use crate::model::manifest::{CoreVersionManifest, FullManifest}; /// (wasm32). Keeps the tokio runtime free of `simd_json` work so other /// in-flight manifest fetches keep driving network IO while this one /// parses. -async fn parse_json_off_runtime(mut bytes: Vec) -> Result +pub(crate) async fn parse_json_off_runtime(mut bytes: Vec) -> Result where T: serde::de::DeserializeOwned + Send + 'static, { diff --git a/crates/ruborist/src/service/mod.rs b/crates/ruborist/src/service/mod.rs index 09456acb9..52a3f1767 100644 --- a/crates/ruborist/src/service/mod.rs +++ b/crates/ruborist/src/service/mod.rs @@ -59,6 +59,7 @@ pub use cache::{ }; pub use fs::{Glob, NoopGlob, exists, read_to_string}; pub use http::client_builder; +pub(crate) use manifest::parse_json_off_runtime; pub use manifest::{ FetchManifestBytesResult, FetchManifestOptions, FetchManifestResult, FetchVersionManifestOptions, MetadataFormat, fetch_full_manifest, fetch_full_manifest_bytes, From 5df46771bd948cf61006a49bad7a2a3ee5dbd27d Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 11:59:26 +0800 Subject: [PATCH 06/17] perf(pm): inline CoreVersionManifest parse + bump default concurrency to 64 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CoreVersionManifest (5-20KB) parse time (~50-200μs) is less than the rayon oneshot round-trip overhead (~300-500μs), making the offload a net loss on the critical path. Switch to inline simd_json parse for version manifests; keep FullManifest on rayon (200KB-2MB warrants it). Bump default manifest fetch concurrency 20→64 to match the tarball download limit and better utilise available connections. Co-Authored-By: Claude Sonnet 4.6 --- crates/ruborist/src/resolver/builder.rs | 10 ++++------ crates/ruborist/src/service/api.rs | 6 +++--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index b649d1e38..0b091e399 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -845,11 +845,9 @@ async fn parse_full_manifest_off_runtime(raw_bytes: Vec) -> anyhow::Result, -) -> anyhow::Result> { - crate::service::parse_json_off_runtime::(bytes) - .await +fn parse_core_manifest_inline(mut bytes: Vec) -> anyhow::Result> { + simd_json::serde::from_slice::(&mut bytes) + .map_err(|e| anyhow::anyhow!("JSON parse error: {e}")) .map(Arc::new) } @@ -1243,7 +1241,7 @@ async fn apply_fetch_result( FetchDone::Version { name, spec, result } => { let key = (name, spec); let parsed: anyhow::Result> = match result { - Ok(bytes) => parse_core_manifest_off_runtime(bytes).await, + Ok(bytes) => parse_core_manifest_inline(bytes), Err(e) => Err(e), }; match parsed { diff --git a/crates/ruborist/src/service/api.rs b/crates/ruborist/src/service/api.rs index 878b357a1..3155bbdc0 100644 --- a/crates/ruborist/src/service/api.rs +++ b/crates/ruborist/src/service/api.rs @@ -85,7 +85,7 @@ impl BuildDepsOptions { cache_dir: None, manifest_store: Arc::new(NoopStore), warm_project_cache: None, - concurrency: 20, + concurrency: 64, peer_deps: PeerDeps::Skip, glob, receiver, @@ -318,7 +318,7 @@ mod tests { cache_dir: None, manifest_store: Arc::new(NoopStore), warm_project_cache: None, - concurrency: 20, + concurrency: 64, peer_deps: PeerDeps::Skip, glob: NoopGlob, receiver: NoopReceiver, @@ -326,7 +326,7 @@ mod tests { catalogs: HashMap::new(), }; - assert_eq!(options.concurrency, 20); + assert_eq!(options.concurrency, 64); assert_eq!(options.peer_deps, PeerDeps::Skip); assert!(options.supports_semver.is_none()); } From e2a02e44600c5455b2002ee4b46d85f3dc8ce1d0 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 13:11:16 +0800 Subject: [PATCH 07/17] perf(pm): batch-drain completed fetches before re-pumping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit At high concurrency (c=64) many JoinHandles complete simultaneously. The previous code processed one FetchDone per outer-loop iteration, re-running pump_fetches and the level_pending drain 64 times in sequence. Add a non-blocking poll_next_unpin drain loop after the first blocking await to consume all already-ready results in one pass, reducing outer-loop overhead by up to 63× per burst. Co-Authored-By: Claude Sonnet 4.6 --- crates/ruborist/src/resolver/builder.rs | 32 +++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index 0b091e399..a897c34c4 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -1550,6 +1550,38 @@ where &mut level_pending, ) .await; + + // At high concurrency many JoinHandles complete at once. Drain all + // already-ready results in one pass so we avoid redundant + // pump_fetches + level_pending iterations for each one. + loop { + let next = std::future::poll_fn(|cx| { + use std::task::Poll; + match fetches.poll_next_unpin(cx) { + Poll::Ready(item) => Poll::Ready(item), + Poll::Pending => Poll::Ready(None), + } + }) + .await; + let Some(result) = next else { break }; + let done = result.map_err(|e| { + registry_error::(format!("manifest fetch task failed: {e}")) + })?; + apply_fetch_result( + done, + &mut full_cache, + &mut version_cache, + &mut full_waiters, + &mut version_waiters, + &mut full_failures, + &mut version_failures, + &mut fetch_queues, + &preload_config, + supports_semver, + &mut level_pending, + ) + .await; + } } receiver.on_event(BuildEvent::LevelComplete { From cd3866992e7632abc097da09bc716545de809083 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 13:48:58 +0800 Subject: [PATCH 08/17] =?UTF-8?q?perf(pm):=20bump=20default=20manifest=20c?= =?UTF-8?q?oncurrency=2064=E2=86=92128?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pcap on c=64 showed streams=66, retx=5, dup_ack=0 — CDN is not rate-limiting at all. bun runs ~258 streams with H1. Raising to 128 to close the gap and observe whether streams scale linearly and retransmissions stay low. Co-Authored-By: Claude Sonnet 4.6 --- crates/ruborist/src/service/api.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/ruborist/src/service/api.rs b/crates/ruborist/src/service/api.rs index 3155bbdc0..d2432dc98 100644 --- a/crates/ruborist/src/service/api.rs +++ b/crates/ruborist/src/service/api.rs @@ -85,7 +85,7 @@ impl BuildDepsOptions { cache_dir: None, manifest_store: Arc::new(NoopStore), warm_project_cache: None, - concurrency: 64, + concurrency: 128, peer_deps: PeerDeps::Skip, glob, receiver, @@ -318,7 +318,7 @@ mod tests { cache_dir: None, manifest_store: Arc::new(NoopStore), warm_project_cache: None, - concurrency: 64, + concurrency: 128, peer_deps: PeerDeps::Skip, glob: NoopGlob, receiver: NoopReceiver, @@ -326,7 +326,7 @@ mod tests { catalogs: HashMap::new(), }; - assert_eq!(options.concurrency, 64); + assert_eq!(options.concurrency, 128); assert_eq!(options.peer_deps, PeerDeps::Skip); assert!(options.supports_semver.is_none()); } From f86667b6e4c4e11e630b3da5d6482a6156273fc5 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 15:02:56 +0800 Subject: [PATCH 09/17] perf(pm): fill all idle slots with prefetch when demand queue is empty MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When all enqueued demand requests are already in-flight the demand queue is empty, yet pop_next was returning None once active_prefetch hit concurrency/4 (16), leaving up to 48 slots idle. Idle slots should be used for speculative prefetch — there is nothing to reserve capacity for when demand has no pending items. The cap only matters when demand queue is non-empty (to keep slots available for higher-priority demand items). Removing the cap in the empty-demand case maximises lookahead coverage, so more manifests are cache-warm when BFS arrives at them. Co-Authored-By: Claude Sonnet 4.6 --- crates/ruborist/src/resolver/builder.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index a897c34c4..30da1d472 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -899,7 +899,19 @@ impl FetchQueues { return Some(request); } - if self.active_prefetches() >= prefetch_concurrency { + // Only cap prefetch when the demand queue still has pending items — those + // items need slots reserved so they aren't held behind a wall of prefetch. + // When the demand queue is empty (all known-needed fetches are already + // in-flight), allow prefetch to fill every remaining slot: there is nothing + // to reserve capacity for, and more speculative coverage means the BFS is + // more likely to find manifests already in cache when it needs them. + let cap = if self.demand.is_empty() { + usize::MAX + } else { + prefetch_concurrency + }; + + if self.active_prefetches() >= cap { return None; } From 163e6565cd552f54b1654900e29966c52c1d8748 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 18:15:18 +0800 Subject: [PATCH 10/17] perf(pm): deep transitive prefetch via spec-carrying FetchRequest::Full MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FetchRequest::Full now carries an optional spec from the dependency edge. When the FullManifest arrives with a spec, apply_fetch_result resolves the version immediately (same resolve_target_version path as BFS) and stores the CoreVersionManifest in version_cache. This triggers schedule_transitive_prefetches recursively — prefetch depth is no longer limited to one level ahead. The BFS non-semver path checks version_cache before falling back to resolve_from_full_manifest, so the speculative resolution result is consumed directly without redundant get_core_version re-parse. Before: prefetch discovers deps 1 level ahead → request valleys at BFS level boundaries (throughput drops 3-4× every ~1s). After: each prefetch FullManifest triggers recursive discovery → prefetch runs to full transitive closure depth → BFS finds warm cache → no valleys. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/resolver/builder.rs | 98 +++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 7 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index 30da1d472..041907269 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -788,15 +788,24 @@ enum FetchKey { #[cfg(not(target_arch = "wasm32"))] #[derive(Clone)] enum FetchRequest { - Full { name: String }, - Version { name: String, spec: String }, + Full { + name: String, + /// Semver spec from the dependency edge, carried so prefetch can + /// speculatively resolve the version when the manifest arrives and + /// trigger deep transitive prefetch without waiting for BFS. + spec: Option, + }, + Version { + name: String, + spec: String, + }, } #[cfg(not(target_arch = "wasm32"))] impl FetchRequest { fn key(&self) -> FetchKey { match self { - Self::Full { name } => FetchKey::Full(name.clone()), + Self::Full { name, .. } => FetchKey::Full(name.clone()), Self::Version { name, spec } => FetchKey::Version(name.clone(), spec.clone()), } } @@ -806,6 +815,7 @@ impl FetchRequest { enum FetchDone { Full { name: String, + spec: Option, result: anyhow::Result<(Vec, Option)>, }, Version { @@ -954,7 +964,7 @@ fn fetch_registry_manifest(registry_url: String, request: FetchRequest) -> Fetch tokio::spawn(async move { match request { - FetchRequest::Full { name } => { + FetchRequest::Full { name, spec } => { let result = fetch_full_manifest_bytes(FetchManifestOptions { registry_url: ®istry_url, name: &name, @@ -968,7 +978,7 @@ fn fetch_registry_manifest(registry_url: String, request: FetchRequest) -> Fetch Err(anyhow::anyhow!("304 Not Modified without etag context")) } }); - FetchDone::Full { name, result } + FetchDone::Full { name, spec, result } } FetchRequest::Version { name, spec } => { let result = fetch_version_manifest_bytes(FetchVersionManifestOptions { @@ -1027,7 +1037,13 @@ fn schedule_registry_fetch( priority, ); } else if !full_cache.contains_key(&real_name) && !full_failures.contains_key(&real_name) { - fetch_queues.enqueue(FetchRequest::Full { name: real_name }, priority); + fetch_queues.enqueue( + FetchRequest::Full { + name: real_name, + spec: Some(real_spec), + }, + priority, + ); } } @@ -1233,13 +1249,35 @@ async fn apply_fetch_result( fetch_queues.complete(&done_key); match done { - FetchDone::Full { name, result } => { + FetchDone::Full { name, spec, result } => { let parsed: anyhow::Result> = match result { Ok((bytes, _etag)) => parse_full_manifest_off_runtime(bytes).await, Err(e) => Err(e), }; match parsed { Ok(full) => { + // When the fetch carried a spec (from prefetch), resolve + // the version now and cache the CoreVersionManifest so BFS + // can skip the re-parse. This also triggers recursive + // transitive prefetch — the key to deep speculative + // discovery without BFS level gating. + if let Some(spec) = &spec + && let Ok(version) = resolve_target_version((&*full).into(), spec) + && let Some(core) = full.get_core_version(&version).map(Arc::new) + { + let key = (name.clone(), spec.clone()); + version_cache.insert(key, Arc::clone(&core)); + schedule_transitive_prefetches( + &core, + preload_config, + supports_semver, + full_cache, + version_cache, + full_failures, + version_failures, + fetch_queues, + ); + } full_cache.insert(name.clone(), full); } Err(e) => { @@ -1454,6 +1492,52 @@ where )); } + // Check version_cache first — deep prefetch may have + // already resolved the version from the FullManifest, + // avoiding a redundant get_core_version re-parse. + let vkey = (real_name.clone(), real_spec.clone()); + if let Some(manifest) = version_cache.get(&vkey).cloned() { + let resolved = ResolvedPackage { + name: edge.name.clone(), + version: manifest.version.clone(), + manifest, + }; + let processed = if graph + .check_override(parent, &edge.name, Some(&resolved.version)) + .is_some() + { + process_dependency(graph, registry, parent, &edge, config) + .await + .map_err(|inner| chain_err(graph, parent, &edge, inner))? + } else { + receiver.on_event(BuildEvent::PackageResolved( + (&*resolved.manifest).into(), + )); + schedule_transitive_prefetches( + &resolved.manifest, + &preload_config, + supports_semver, + &full_cache, + &version_cache, + &full_failures, + &version_failures, + &mut fetch_queues, + ); + process_dependency_with_resolved( + graph, parent, &edge, &resolved, config, + ) + }; + handle_processed( + graph, + receiver, + parent, + &edge, + &processed, + &mut next_level, + ); + continue; + } + if let Some(full) = full_cache.get(&real_name).cloned() { let Some(resolved) = resolve_from_full_manifest::( &edge, From 93134a0f28543caad418a998e43213aa4e2c72df Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 19:10:13 +0800 Subject: [PATCH 11/17] perf(pm): offload speculative version parse to rayon in deep prefetch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The speculative get_core_version call in FetchDone::Full was running synchronously on the main-loop task, adding ~100-500μs of blocking per manifest. Replace with extract_core_version_off_runtime which uses rayon::spawn + oneshot, keeping the main-loop free to process other FetchDone events during the parse. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/resolver/builder.rs | 32 +++++++++++++++---------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index 041907269..e5396e4cb 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -1263,20 +1263,26 @@ async fn apply_fetch_result( // discovery without BFS level gating. if let Some(spec) = &spec && let Ok(version) = resolve_target_version((&*full).into(), spec) - && let Some(core) = full.get_core_version(&version).map(Arc::new) { - let key = (name.clone(), spec.clone()); - version_cache.insert(key, Arc::clone(&core)); - schedule_transitive_prefetches( - &core, - preload_config, - supports_semver, - full_cache, - version_cache, - full_failures, - version_failures, - fetch_queues, - ); + let (_, core) = crate::model::manifest::extract_core_version_off_runtime( + Arc::clone(&full), + version, + ) + .await; + if let Some(core) = core { + let key = (name.clone(), spec.clone()); + version_cache.insert(key, Arc::clone(&core)); + schedule_transitive_prefetches( + &core, + preload_config, + supports_semver, + full_cache, + version_cache, + full_failures, + version_failures, + fetch_queues, + ); + } } full_cache.insert(name.clone(), full); } From fb03b93c09332542df828a10305fb550adeb07e1 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 19:32:10 +0800 Subject: [PATCH 12/17] perf(pm): merge speculative version parse into FullManifest rayon task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Combines the FullManifest JSON parse and the speculative version extraction (get_core_version) into a single rayon::spawn, eliminating the second oneshot roundtrip that was adding ~300-500μs per manifest and causing iCtx to spike +74%. Before: 2 sequential rayon roundtrips per FetchDone::Full (parse + extract), each with spawn + oneshot overhead. After: 1 rayon roundtrip does both. The spec (when present) is resolved and CoreVersionManifest extracted in the same closure. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/resolver/builder.rs | 92 +++++++++++++++---------- crates/ruborist/src/service/mod.rs | 1 - 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/crates/ruborist/src/resolver/builder.rs b/crates/ruborist/src/resolver/builder.rs index e5396e4cb..6ed7e3f52 100644 --- a/crates/ruborist/src/resolver/builder.rs +++ b/crates/ruborist/src/resolver/builder.rs @@ -846,12 +846,49 @@ where ResolveError::Registry(RegistryError(anyhow::anyhow!(message.into())).into()) } +/// Parse a FullManifest on the rayon pool. When `spec` is provided, +/// also resolve the version and extract the CoreVersionManifest in +/// the same rayon task — one roundtrip instead of two. #[cfg(not(target_arch = "wasm32"))] -async fn parse_full_manifest_off_runtime(raw_bytes: Vec) -> anyhow::Result> { - let parse_buf = raw_bytes.clone(); - let mut manifest: FullManifest = crate::service::parse_json_off_runtime(parse_buf).await?; - manifest.raw = Arc::from(raw_bytes); - Ok(Arc::new(manifest)) +type FullParseResult = ( + Arc, + Option<(String, Arc)>, +); + +async fn parse_full_manifest_off_runtime( + raw_bytes: Vec, + spec: Option<&str>, +) -> anyhow::Result { + use crate::resolver::version::resolve_target_version; + + let spec_owned = spec.map(|s| s.to_string()); + let (tx, rx) = tokio::sync::oneshot::channel(); + rayon::spawn(move || { + let result = (|| -> anyhow::Result { + let parse_buf = raw_bytes.clone(); + let mut manifest: FullManifest = + simd_json::serde::from_slice::(&mut parse_buf.into_boxed_slice()) + .map_err(|e| anyhow::anyhow!("JSON parse error: {e}"))?; + manifest.raw = Arc::from(raw_bytes); + let full = Arc::new(manifest); + + let speculative = if let Some(spec) = &spec_owned { + resolve_target_version((&*full).into(), spec) + .ok() + .and_then(|version| { + full.get_core_version(&version) + .map(|core| (spec.clone(), Arc::new(core))) + }) + } else { + None + }; + + Ok((full, speculative)) + })(); + let _ = tx.send(result); + }); + rx.await + .map_err(|_| anyhow::anyhow!("rayon parse worker dropped"))? } #[cfg(not(target_arch = "wasm32"))] @@ -1250,39 +1287,24 @@ async fn apply_fetch_result( match done { FetchDone::Full { name, spec, result } => { - let parsed: anyhow::Result> = match result { - Ok((bytes, _etag)) => parse_full_manifest_off_runtime(bytes).await, + let parsed = match result { + Ok((bytes, _etag)) => parse_full_manifest_off_runtime(bytes, spec.as_deref()).await, Err(e) => Err(e), }; match parsed { - Ok(full) => { - // When the fetch carried a spec (from prefetch), resolve - // the version now and cache the CoreVersionManifest so BFS - // can skip the re-parse. This also triggers recursive - // transitive prefetch — the key to deep speculative - // discovery without BFS level gating. - if let Some(spec) = &spec - && let Ok(version) = resolve_target_version((&*full).into(), spec) - { - let (_, core) = crate::model::manifest::extract_core_version_off_runtime( - Arc::clone(&full), - version, - ) - .await; - if let Some(core) = core { - let key = (name.clone(), spec.clone()); - version_cache.insert(key, Arc::clone(&core)); - schedule_transitive_prefetches( - &core, - preload_config, - supports_semver, - full_cache, - version_cache, - full_failures, - version_failures, - fetch_queues, - ); - } + Ok((full, speculative)) => { + if let Some((resolved_spec, core)) = speculative { + version_cache.insert((name.clone(), resolved_spec), Arc::clone(&core)); + schedule_transitive_prefetches( + &core, + preload_config, + supports_semver, + full_cache, + version_cache, + full_failures, + version_failures, + fetch_queues, + ); } full_cache.insert(name.clone(), full); } diff --git a/crates/ruborist/src/service/mod.rs b/crates/ruborist/src/service/mod.rs index 52a3f1767..09456acb9 100644 --- a/crates/ruborist/src/service/mod.rs +++ b/crates/ruborist/src/service/mod.rs @@ -59,7 +59,6 @@ pub use cache::{ }; pub use fs::{Glob, NoopGlob, exists, read_to_string}; pub use http::client_builder; -pub(crate) use manifest::parse_json_off_runtime; pub use manifest::{ FetchManifestBytesResult, FetchManifestOptions, FetchManifestResult, FetchVersionManifestOptions, MetadataFormat, fetch_full_manifest, fetch_full_manifest_bytes, From 6b2d7d26302d0d3244cb953ce4c3b945226323b0 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 20:22:18 +0800 Subject: [PATCH 13/17] perf(pm): use 4 independent HTTP client pools for connection diversity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A single reqwest client reuses connections so efficiently that actual TCP connections stay around ~66 even with concurrency=128 — the proxy returns responses in ~53μs, freeing connections before new requests need them. bun opens ~258 connections because it doesn't pool as aggressively. Replace the single global client with 4 independent pools, each with its own keep-alive connection set. Requests round-robin across pools, so each pool opens its own connections via DNS rotation → total TCP connections ≈ 4× single-pool count. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/service/http.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/crates/ruborist/src/service/http.rs b/crates/ruborist/src/service/http.rs index fd5ca63c4..127a14c4b 100644 --- a/crates/ruborist/src/service/http.rs +++ b/crates/ruborist/src/service/http.rs @@ -85,18 +85,26 @@ use anyhow::{Context, Result, anyhow}; /// error, which silently-stalled sockets never raise. const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); -/// Global HTTP client with connection pooling and DNS caching. -/// -/// Stores `Result` so that proxy-configuration errors are -/// surfaced to callers instead of panicking or calling `process::exit`. -static HTTP_CLIENT: LazyLock> = LazyLock::new(|| { - client_builder() - .and_then(|b| b.build().context("Failed to build reqwest client")) +/// Number of independent HTTP client pools. Each pool maintains its own +/// set of keep-alive connections, so N pools ≈ N× the total TCP connections +/// to the registry CDN. This spreads requests across more CDN edge servers +/// via DNS round-robin and avoids the "pool reuse is too efficient" problem +/// where a single pool recycles connections faster than new requests arrive. +const CLIENT_POOL_SIZE: usize = 4; + +static HTTP_CLIENTS: LazyLock, String>> = LazyLock::new(|| { + (0..CLIENT_POOL_SIZE) + .map(|_| client_builder().and_then(|b| b.build().context("Failed to build reqwest client"))) + .collect::>>() .map_err(|e| e.to_string()) }); +static CLIENT_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); + pub(crate) fn get_client() -> Result<&'static reqwest::Client> { - HTTP_CLIENT.as_ref().map_err(|e| anyhow!("{e}")) + let clients = HTTP_CLIENTS.as_ref().map_err(|e| anyhow!("{e}"))?; + let idx = CLIENT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % clients.len(); + Ok(&clients[idx]) } /// Override reqwest's default `ring` with `aws-lc-rs` on macOS. Local From fba11eec6c3ecfcb983e8caaab6eccb096074055 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 21:16:26 +0800 Subject: [PATCH 14/17] perf(pm): increase client pool to 32 for bun-like connection rotation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit bun maintains ~258 persistent connections with 64 active request slots rotating across them, keeping each connection lightly loaded (~3 reqs/conn). With 4 pools utoo only reached 87 connections. Increasing to 32 pools targets ~256 total connections (32 × ~8 conns/pool), matching bun's connection diversity while preserving keep-alive benefits within each pool. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/service/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/ruborist/src/service/http.rs b/crates/ruborist/src/service/http.rs index 127a14c4b..4da399f59 100644 --- a/crates/ruborist/src/service/http.rs +++ b/crates/ruborist/src/service/http.rs @@ -90,7 +90,7 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); /// to the registry CDN. This spreads requests across more CDN edge servers /// via DNS round-robin and avoids the "pool reuse is too efficient" problem /// where a single pool recycles connections faster than new requests arrive. -const CLIENT_POOL_SIZE: usize = 4; +const CLIENT_POOL_SIZE: usize = 32; static HTTP_CLIENTS: LazyLock, String>> = LazyLock::new(|| { (0..CLIENT_POOL_SIZE) From b6f40bc6c57250b9f8d12271d33c227930432a69 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 21:47:13 +0800 Subject: [PATCH 15/17] =?UTF-8?q?perf(pm):=20revert=20pool=20size=2032?= =?UTF-8?q?=E2=86=924=20(32=20causes=20TLS=20overhead=20regression)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 32 pools fragmented the connection pool — each pool handled only ~25 requests, keep-alive reuse dropped, and TLS renegotiation overhead caused p1 to regress from ~3.4s to 4.24s. 4 pools is the sweet spot: enough rotation to spread connections (+31%) while keeping per-pool reuse efficient. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/service/http.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/ruborist/src/service/http.rs b/crates/ruborist/src/service/http.rs index 4da399f59..127a14c4b 100644 --- a/crates/ruborist/src/service/http.rs +++ b/crates/ruborist/src/service/http.rs @@ -90,7 +90,7 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); /// to the registry CDN. This spreads requests across more CDN edge servers /// via DNS round-robin and avoids the "pool reuse is too efficient" problem /// where a single pool recycles connections faster than new requests arrive. -const CLIENT_POOL_SIZE: usize = 32; +const CLIENT_POOL_SIZE: usize = 4; static HTTP_CLIENTS: LazyLock, String>> = LazyLock::new(|| { (0..CLIENT_POOL_SIZE) From 70a4b863c05df881d64464b5fd9d85b07390738b Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 22:15:51 +0800 Subject: [PATCH 16/17] perf(pm): share TLS session cache across client pools All 4 reqwest clients now share a single rustls ClientConfig (and its Arc). When pool 1 establishes a TLS session to CDN IP X, pool 2 reuses the session ticket for 1-RTT resumption instead of a full 2-RTT handshake. This reduces the per-connection cost of the multi-pool approach. Also builds a custom rustls config on Linux (previously only done on macOS for aws-lc-rs), and removes the duplicate use_preconfigured_tls call from client_builder. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/src/service/http.rs | 63 ++++++++++++++++++++++++----- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/crates/ruborist/src/service/http.rs b/crates/ruborist/src/service/http.rs index 127a14c4b..fbbc023ae 100644 --- a/crates/ruborist/src/service/http.rs +++ b/crates/ruborist/src/service/http.rs @@ -92,12 +92,59 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(5); /// where a single pool recycles connections faster than new requests arrive. const CLIENT_POOL_SIZE: usize = 4; -static HTTP_CLIENTS: LazyLock, String>> = LazyLock::new(|| { +static HTTP_CLIENTS: LazyLock, String>> = + LazyLock::new(|| build_client_pool().map_err(|e| e.to_string())); + +#[cfg(not(target_arch = "wasm32"))] +fn build_client_pool() -> Result> { + // Build a single rustls config so all pools share the TLS session cache. + // When pool 1 establishes a TLS session to IP X, pool 2 reuses the + // cached session ticket for a 1-RTT resumption instead of a 2-RTT + // full handshake. This makes the "new connection per pool" cost + // much lower. + let tls_config = build_shared_rustls_config()?; + (0..CLIENT_POOL_SIZE) + .map(|_| { + client_builder()? + .use_preconfigured_tls(tls_config.clone()) + .build() + .context("Failed to build reqwest client") + }) + .collect() +} + +#[cfg(target_arch = "wasm32")] +fn build_client_pool() -> Result> { (0..CLIENT_POOL_SIZE) - .map(|_| client_builder().and_then(|b| b.build().context("Failed to build reqwest client"))) - .collect::>>() - .map_err(|e| e.to_string()) -}); + .map(|_| { + client_builder()? + .build() + .context("Failed to build reqwest client") + }) + .collect() +} + +/// Shared rustls config for all client pools. Cloning a ClientConfig +/// shares the Arc session cache, so TLS +/// session tickets are reused across pools. +#[cfg(not(target_arch = "wasm32"))] +fn build_shared_rustls_config() -> Result { + #[cfg(target_os = "macos")] + { + build_rustls_config() + } + #[cfg(not(target_os = "macos"))] + { + let roots = rustls_native_certs::load_native_certs(); + let mut root_store = rustls::RootCertStore::empty(); + for cert in roots.certs { + let _ = root_store.add(cert); + } + Ok(rustls::ClientConfig::builder() + .with_root_certificates(root_store) + .with_no_client_auth()) + } +} static CLIENT_COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); @@ -175,10 +222,8 @@ pub fn client_builder() -> Result { // manifest requests open independent TCP streams instead. .http1_only(); - #[cfg(target_os = "macos")] - { - builder = builder.use_preconfigured_tls(build_rustls_config()?); - } + // TLS config is set by build_client_pool via use_preconfigured_tls, + // not here — this keeps the session cache shared across all pools. match env_var("ALL_PROXY") { Some(url) => { From b6c37ed6b3edcf2494a5abe01114110a70745190 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 13 May 2026 22:44:19 +0800 Subject: [PATCH 17/17] perf(pm): share TLS session cache across client pools on all platforms Add rustls + rustls-native-certs as direct deps for non-macOS native targets so the multi-pool client can share a single rustls ClientConfig (and its Arc) across all 4 pools. macOS: aws-lc-rs provider (existing) Linux: ring provider (new direct dep, was previously implicit via reqwest) Cross-pool TLS session resumption: when pool 1 negotiates a session to CDN IP X, pools 2-4 reuse the ticket for 1-RTT instead of 2-RTT. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/ruborist/Cargo.toml | 6 +++++- crates/ruborist/src/service/http.rs | 15 +++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/ruborist/Cargo.toml b/crates/ruborist/Cargo.toml index fdda5ea5e..522a69d2a 100644 --- a/crates/ruborist/Cargo.toml +++ b/crates/ruborist/Cargo.toml @@ -53,8 +53,12 @@ workspace = true tokio = { workspace = true, features = ["macros", "rt"] } # Native (non-macOS) targets: reqwest's default rustls + ring. +# rustls + rustls-native-certs are direct deps so the multi-pool client +# can share a TLS session cache across pools (see service/http.rs). [target.'cfg(not(any(target_arch = "wasm32", target_os = "macos")))'.dependencies] -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots"] } +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots"] } +rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } +rustls-native-certs = "0.8" # Native-only dependencies (not compiled for WASM) [target.'cfg(not(target_arch = "wasm32"))'.dependencies] diff --git a/crates/ruborist/src/service/http.rs b/crates/ruborist/src/service/http.rs index fbbc023ae..376a2ac02 100644 --- a/crates/ruborist/src/service/http.rs +++ b/crates/ruborist/src/service/http.rs @@ -97,16 +97,14 @@ static HTTP_CLIENTS: LazyLock, String>> = #[cfg(not(target_arch = "wasm32"))] fn build_client_pool() -> Result> { - // Build a single rustls config so all pools share the TLS session cache. - // When pool 1 establishes a TLS session to IP X, pool 2 reuses the - // cached session ticket for a 1-RTT resumption instead of a 2-RTT - // full handshake. This makes the "new connection per pool" cost - // much lower. - let tls_config = build_shared_rustls_config()?; + // Share a single rustls config across all pools so TLS session + // tickets are reused: pool 2 connecting to an IP that pool 1 + // already negotiated gets 1-RTT resumption instead of 2-RTT. + let shared_tls = build_shared_rustls_config()?; (0..CLIENT_POOL_SIZE) .map(|_| { client_builder()? - .use_preconfigured_tls(tls_config.clone()) + .use_preconfigured_tls(shared_tls.clone()) .build() .context("Failed to build reqwest client") }) @@ -124,9 +122,6 @@ fn build_client_pool() -> Result> { .collect() } -/// Shared rustls config for all client pools. Cloning a ClientConfig -/// shares the Arc session cache, so TLS -/// session tickets are reused across pools. #[cfg(not(target_arch = "wasm32"))] fn build_shared_rustls_config() -> Result { #[cfg(target_os = "macos")]