Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2bf7115
perf(pm): bump manifests-concurrency-limit 64 → 256 + add fetch break…
elrrrrrrr May 8, 2026
8ac97ae
chore(p1): revert concurrency 256 → 64 + restore manifest-bench
elrrrrrrr May 8, 2026
5690a9b
ci(p1): wire manifest-bench standalone HTTP sweep into bench-phases-l…
elrrrrrrr May 8, 2026
94af458
perf(ruborist): inline JSON parse, drop rayon::spawn dispatch
elrrrrrrr May 8, 2026
ee5f5f4
perf(ruborist): switch JSON parse to tokio spawn_blocking
elrrrrrrr May 8, 2026
16404fc
perf(ruborist): switch extract_core_version to spawn_blocking too
elrrrrrrr May 8, 2026
460a538
revert + instrument(ruborist): post-build phase timing
elrrrrrrr May 8, 2026
58d49aa
instrument(ruborist): preload main loop dispatch + result split
elrrrrrrr May 8, 2026
8114bf4
perf(pm): grow rayon pool to max(num_cpus, 8) to drain p1 extract queue
elrrrrrrr May 8, 2026
394f6c9
perf(pm): skip preload for p1 path; BFS does per-level parallel prefetch
elrrrrrrr May 8, 2026
596cd20
perf(pm): fast_preload bypasses UnifiedRegistry for utoo deps path
elrrrrrrr May 8, 2026
2e74bba
perf(pm): dispatch fast_preload settle to rayon to free tokio runtime
elrrrrrrr May 8, 2026
04c9ec3
perf(pm): bump manifests-concurrency-limit 64 → 96 (manifest-bench best)
elrrrrrrr May 8, 2026
6455852
perf(pm): fast_preload populates (name, spec) cache slot for BFS fast…
elrrrrrrr May 8, 2026
4bbcae8
perf(pm): fuse primary settle into fetch task to drop dispatch RTT
elrrrrrrr May 8, 2026
671ac98
perf(pm): combined-parse fetch path eliminates per-fetch double simd_…
elrrrrrrr May 8, 2026
542d7f1
perf(pm): bump manifests-concurrency-limit 96 → 128
elrrrrrrr May 8, 2026
c8768ac
revert(pm): manifests-concurrency-limit back to 96
elrrrrrrr May 8, 2026
3be7487
perf(pm): mb_resolve experimental fetch path (parallel track to fast_…
elrrrrrrr May 9, 2026
02cc12e
perf(pm): mb_resolve v3 — two-phase pure HTTP + rayon batch parse
elrrrrrrr May 9, 2026
24165fb
fix(pm): mb_resolve v3 — restore spec-level dedup to terminate
elrrrrrrr May 9, 2026
41822b0
perf(pm): preload-bench — self-contained streaming preload baseline
elrrrrrrr May 9, 2026
01d1513
perf(pm): integrate standalone preload into ruborist for lockfile-onl…
elrrrrrrr May 9, 2026
d9fb207
perf(pm): aws-lc-rs TLS for mb_resolve + per-stage breakdown for BFS/…
elrrrrrrr May 9, 2026
c02bb15
perf(pm): fold preload + BFS into mb_fetch_with_graph for utoo deps
elrrrrrrr May 9, 2026
63928a7
fix(pm): mb_fetch_with_graph — drain edge_targets via inline cache hit
elrrrrrrr May 9, 2026
2527137
ci(pcap): add manifest-bench + preload-bench captures for TCP-level diff
elrrrrrrr May 9, 2026
fe26709
ci(pcap): upload small summaries artifact alongside the 2GB pcap dump
elrrrrrrr May 9, 2026
1ff58ae
ci(pcap): upload summary-only artifact + print table to CI logs
elrrrrrrr May 9, 2026
a21f24b
perf(pm): mb_fetch_with_graph — channel-based separation of fetch + g…
elrrrrrrr May 9, 2026
d1cf53e
perf(pm): integrate channel-based mb_fetch into install pipeline
elrrrrrrr May 9, 2026
21d9c7d
fix(pm): mb_fetch_with_graph — normalize npm: alias specs before fetch
elrrrrrrr May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions crates/pm/src/util/user_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,15 @@ pub fn get_install_scope() -> InstallScope {
INSTALL_SCOPE.get().copied().unwrap_or_default()
}

// Manifest fetch concurrency configuration
// Manifest fetch concurrency configuration.
//
// 256 to match bun's observed ~260 parallel TCP streams against
// registry.npmjs.org. Local fetch-breakdown instrumentation showed
// 88% of preload-phase work is in per-request RTT (TCP+TLS+server),
// only 11% body, 0.16% parse — so the dominant lever for p1 wall is
// the cap on concurrent in-flight manifest requests.
static MANIFESTS_CONCURRENCY_LIMIT: LazyLock<ConfigValue<usize>> =
LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 64));
LazyLock::new(|| ConfigValue::new("manifests-concurrency-limit", 256));

pub fn set_manifests_concurrency_limit(value: Option<usize>) {
MANIFESTS_CONCURRENCY_LIMIT.set(value);
Expand Down
17 changes: 15 additions & 2 deletions crates/ruborist/src/resolver/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,7 @@ async fn run_preload_phase<R: RegistryClient, E: EventReceiver>(
return;
}

crate::util::FETCH_TIMINGS.reset();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The FETCH_TIMINGS reset is currently performed inside run_preload_phase, but it is skipped if config.skip_preload is true (line 755). In a long-running process or library context where multiple resolutions occur, this leads to stale timing data from previous runs being reported in the BFS phase logs. It is recommended to move the reset to the start of the entry point build_deps_with_config to ensure each resolution starts with a clean state.

let start = tokio::time::Instant::now();

let initial_deps = gather_preload_deps(graph, config.peer_deps);
Expand Down Expand Up @@ -794,7 +795,13 @@ async fn run_preload_phase<R: RegistryClient, E: EventReceiver>(
failed: stats.failed_count,
});

tracing::debug!("Preload phase: {:?}", start.elapsed());
let preload_elapsed = start.elapsed();
tracing::debug!("Preload phase: {:?}", preload_elapsed);
tracing::info!(
"p1-breakdown preload_wall={}ms | {}",
preload_elapsed.as_millis(),
crate::util::FETCH_TIMINGS.snapshot().summary_line(),
);
}

/// Run the BFS traversal phase to build the dependency tree.
Expand Down Expand Up @@ -896,7 +903,13 @@ async fn run_bfs_phase<R: RegistryClient, E: EventReceiver>(
current_level = next_level;
}

tracing::debug!("Build phase: {:?}", start.elapsed());
let bfs_elapsed = start.elapsed();
tracing::debug!("Build phase: {:?}", bfs_elapsed);
tracing::info!(
"p1-breakdown bfs_wall={}ms | {}",
bfs_elapsed.as_millis(),
crate::util::FETCH_TIMINGS.snapshot().summary_line(),
);
Ok(())
}

Expand Down
24 changes: 22 additions & 2 deletions crates/ruborist/src/service/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::fetch::{
};
use super::http::get_client;
use crate::model::manifest::{CoreVersionManifest, FullManifest};
use crate::util::FETCH_TIMINGS;

/// Parse JSON bytes on rayon's CPU thread pool (native) or inline
/// (wasm32). Keeps the tokio runtime free of `simd_json` work so other
Expand Down Expand Up @@ -91,7 +92,9 @@ pub async fn fetch_full_manifest(opts: FetchManifestOptions<'_>) -> Result<Fetch
request = request.header("If-None-Match", etag_value);
}

let t_request_start = std::time::Instant::now();
let response = request.send().await.map_err(classify_reqwest_error)?;
let request_us = t_request_start.elapsed().as_micros() as u64;
let status = response.status();

if status == reqwest::StatusCode::NOT_MODIFIED {
Expand All @@ -109,19 +112,25 @@ pub async fn fetch_full_manifest(opts: FetchManifestOptions<'_>) -> Result<Fetch
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());

let t_body_start = std::time::Instant::now();
let raw_bytes = response
.bytes()
.await
.map_err(|e| FetchError::Permanent(anyhow!("Response read error: {e}")))?
.to_vec();
let body_us = t_body_start.elapsed().as_micros() as u64;
let bytes_len = raw_bytes.len() as u64;
// simd_json mutates the parse buffer; clone so the raw
// bytes survive for `manifest.raw`.
let parse_buf = raw_bytes.clone();
let t_parse_start = std::time::Instant::now();
let mut manifest: FullManifest = parse_json_off_runtime(parse_buf)
.await
.map_err(FetchError::Permanent)?;
let parse_us = t_parse_start.elapsed().as_micros() as u64;
manifest.raw = std::sync::Arc::from(raw_bytes);

FETCH_TIMINGS.record(request_us, body_us, parse_us, bytes_len);
Ok(FetchManifestResult::Ok(manifest, new_etag))
} else {
Err(classify_status(status, &url))
Expand Down Expand Up @@ -190,23 +199,34 @@ pub async fn fetch_version_manifest(
|| {
let url = url.clone();
async move {
let t_request_start = std::time::Instant::now();
let response = get_client()
.map_err(FetchError::Permanent)?
.get(&url)
.header("Accept", accept)
.send()
.await
.map_err(classify_reqwest_error)?;
let request_us = t_request_start.elapsed().as_micros() as u64;

if response.status().is_success() {
let t_body_start = std::time::Instant::now();
let bytes = response
.bytes()
.await
.map_err(|e| FetchError::Permanent(anyhow!("Response read error: {e}")))?
.to_vec();
parse_json_off_runtime::<CoreVersionManifest>(bytes)
let body_us = t_body_start.elapsed().as_micros() as u64;
let bytes_len = bytes.len() as u64;
let t_parse_start = std::time::Instant::now();
let parsed = parse_json_off_runtime::<CoreVersionManifest>(bytes)
.await
.map_err(FetchError::Permanent)
.map_err(FetchError::Permanent);
let parse_us = t_parse_start.elapsed().as_micros() as u64;
if parsed.is_ok() {
FETCH_TIMINGS.record(request_us, body_us, parse_us, bytes_len);
}
parsed
} else {
Err(classify_status(response.status(), &url))
}
Expand Down
2 changes: 2 additions & 0 deletions crates/ruborist/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Shared utility primitives for ruborist and downstream consumers.

pub mod oncemap;
pub mod timing;

pub use crate::model::util::{PackageNameStr, parse_package_spec, read_package_json};
pub use oncemap::OnceMap;
pub use timing::{FETCH_TIMINGS, FetchTimings, FetchTimingsSnapshot};
134 changes: 134 additions & 0 deletions crates/ruborist/src/util/timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//! Per-phase manifest fetch timing accumulator for p1 perf investigation.
//!
//! Splits each `fetch_*_manifest` call into three observable pieces:
//! - `request_us`: from `request.send().await` to response headers
//! received. Captures TCP connect (when not pooled), TLS handshake,
//! HTTP request roundtrip, and server-side processing.
//! - `body_us`: from response headers to the entire JSON body buffered.
//! Network-bandwidth bound for large packuments.
//! - `parse_us`: from full body buffered to a typed manifest. CPU bound
//! (simd_json on a spawn_blocking thread).
//!
//! `parse_us` is wall-clock for the await on `parse_json_off_runtime` —
//! since JSON parse runs on `spawn_blocking`, this includes scheduling
//! latency rather than pure CPU time. Together with the per-fetch total
//! already tracked in `preload_manifests`, this lets us answer "where
//! did p1's wall time go?" without external profiling.
//!
//! All counters are `AtomicU64` so the recording path is lock-free.
//! Numbers are reset between resolves via [`reset()`] so successive
//! `utoo deps` invocations report independently.

use std::sync::atomic::{AtomicU64, Ordering};

/// Per-process accumulator for manifest fetch timings.
#[derive(Default, Debug)]
pub struct FetchTimings {
/// Number of fetches recorded (full + version manifest).
pub count: AtomicU64,
/// Sum of microseconds spent in `request.send().await`.
pub request_us: AtomicU64,
/// Sum of microseconds spent in `response.bytes().await`.
pub body_us: AtomicU64,
/// Sum of microseconds spent awaiting `parse_json_off_runtime`.
pub parse_us: AtomicU64,
/// Sum of body bytes received across all fetches.
pub bytes: AtomicU64,
}

impl FetchTimings {
/// Record one fetch's split timings. Call once per successful fetch.
pub fn record(&self, request_us: u64, body_us: u64, parse_us: u64, bytes: u64) {
self.count.fetch_add(1, Ordering::Relaxed);
self.request_us.fetch_add(request_us, Ordering::Relaxed);
self.body_us.fetch_add(body_us, Ordering::Relaxed);
self.parse_us.fetch_add(parse_us, Ordering::Relaxed);
self.bytes.fetch_add(bytes, Ordering::Relaxed);
}

/// Reset all counters to zero.
pub fn reset(&self) {
self.count.store(0, Ordering::Relaxed);
self.request_us.store(0, Ordering::Relaxed);
self.body_us.store(0, Ordering::Relaxed);
self.parse_us.store(0, Ordering::Relaxed);
self.bytes.store(0, Ordering::Relaxed);
}

/// Snapshot of the current accumulator state.
pub fn snapshot(&self) -> FetchTimingsSnapshot {
FetchTimingsSnapshot {
count: self.count.load(Ordering::Relaxed),
request_us: self.request_us.load(Ordering::Relaxed),
body_us: self.body_us.load(Ordering::Relaxed),
parse_us: self.parse_us.load(Ordering::Relaxed),
bytes: self.bytes.load(Ordering::Relaxed),
}
}
}

/// Immutable snapshot suitable for printing.
#[derive(Debug, Clone, Copy)]
pub struct FetchTimingsSnapshot {
pub count: u64,
pub request_us: u64,
pub body_us: u64,
pub parse_us: u64,
pub bytes: u64,
}

impl FetchTimingsSnapshot {
/// One-line summary for tracing logs.
pub fn summary_line(&self) -> String {
if self.count == 0 {
return "fetch-timings: no requests recorded".to_string();
}
let count = self.count;
let avg_req = self.request_us / count;
let avg_body = self.body_us / count;
let avg_parse = self.parse_us / count;
let avg_bytes = self.bytes / count;
format!(
"fetch-timings: n={} sum_request={}ms sum_body={}ms sum_parse={}ms total_bytes={}MB | avg_request={}us avg_body={}us avg_parse={}us avg_bytes={}KB",
count,
self.request_us / 1_000,
self.body_us / 1_000,
self.parse_us / 1_000,
self.bytes / 1_000_000,
avg_req,
avg_body,
avg_parse,
avg_bytes / 1_024,
)
}
}

/// Process-wide manifest fetch timing accumulator.
pub static FETCH_TIMINGS: FetchTimings = FetchTimings {
count: AtomicU64::new(0),
request_us: AtomicU64::new(0),
body_us: AtomicU64::new(0),
parse_us: AtomicU64::new(0),
bytes: AtomicU64::new(0),
};
Comment on lines +107 to +113
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a global static FETCH_TIMINGS for accumulating fetch metrics makes the library non-thread-safe for concurrent dependency resolutions. If multiple resolutions are executed in parallel within the same process, their metrics will be interleaved and the reset() calls will interfere with each other. For a library like ruborist, it would be better to encapsulate these metrics within a context object (like BuildDepsConfig) or pass them explicitly through the call stack to support concurrent usage.


#[cfg(test)]
mod tests {
use super::*;

#[test]
fn record_and_snapshot() {
FETCH_TIMINGS.reset();
FETCH_TIMINGS.record(100, 200, 300, 1024);
FETCH_TIMINGS.record(150, 250, 350, 2048);
let snap = FETCH_TIMINGS.snapshot();
assert_eq!(snap.count, 2);
assert_eq!(snap.request_us, 250);
assert_eq!(snap.body_us, 450);
assert_eq!(snap.parse_us, 650);
assert_eq!(snap.bytes, 3072);
FETCH_TIMINGS.reset();
let snap2 = FETCH_TIMINGS.snapshot();
assert_eq!(snap2.count, 0);
}
}
Loading