From 9a1d5c78d19e3ddf0b0a8bb2c1cb67ed92a9c4be Mon Sep 17 00:00:00 2001 From: James Dumay Date: Sat, 18 Apr 2026 14:56:03 +1000 Subject: [PATCH 1/2] Use huggingface_hub_rust for MoE share uploads --- mesh-llm/src/cli/commands/moe/mod.rs | 203 ++++++++++++--------------- 1 file changed, 87 insertions(+), 116 deletions(-) diff --git a/mesh-llm/src/cli/commands/moe/mod.rs b/mesh-llm/src/cli/commands/moe/mod.rs index 364d36bc..efb46faf 100644 --- a/mesh-llm/src/cli/commands/moe/mod.rs +++ b/mesh-llm/src/cli/commands/moe/mod.rs @@ -4,10 +4,7 @@ mod formatters_json; mod hf_jobs; use anyhow::{bail, Context, Result}; -use base64::Engine as _; -use reqwest::StatusCode; -use serde::Deserialize; -use serde_json::json; +use hf_hub::RepoUploadFolderParams; use std::collections::BTreeMap; use std::fmt::Write as _; use std::fs; @@ -422,9 +419,15 @@ async fn run_share_resolved( })?; let log_path = log_path_for(&resolved.path, &ranking.analyzer_id); let bundle = moe_planner::build_submit_bundle(&resolved, &ranking, Some(log_path.as_path()))?; + models::hf_token_override().ok_or_else(|| { + share_error( + "Missing Hugging Face token", + "Set HF_TOKEN or HUGGING_FACE_HUB_TOKEN before running `mesh-llm moe share`.", + ) + })?; let api = models::build_hf_tokio_api(false).context("Build Hugging Face client for MoE share")?; - let (owner, name) = dataset_repo.split_once('/').unwrap_or(("", dataset_repo)); + let (owner, name) = parse_dataset_repo(dataset_repo)?; let dataset = api.dataset(owner, name); let info = dataset .info( @@ -437,14 +440,11 @@ async fn run_share_resolved( let hf_hub::RepoInfo::Dataset(info) = info else { anyhow::bail!("Expected dataset repo info for {}", dataset_repo); }; + let siblings = info.siblings.as_deref().unwrap_or(&[]); let existing = bundle .dataset_paths .iter() - .filter(|path| { - info.siblings - .as_ref() - .is_some_and(|siblings| siblings.iter().any(|entry| &entry.rfilename == *path)) - }) + .filter(|path| siblings.iter().any(|entry| &entry.rfilename == *path)) .cloned() .collect::>(); @@ -472,143 +472,108 @@ async fn run_share_resolved( SharePrefixState::New => {} } - let token = models::hf_token_override().ok_or_else(|| { - share_error( - "Missing Hugging Face token", - "Set HF_TOKEN or HUGGING_FACE_HUB_TOKEN before running `mesh-llm moe share`.", - ) - })?; + let temp_root = std::env::temp_dir().join(format!( + "mesh-llm-moe-share-{}-{}", + std::process::id(), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + fs::create_dir_all(&temp_root)?; + let _temp_root_guard = TempRootGuard(temp_root.clone()); - let mut operations = vec![ndjson_header( - &bundle.commit_message, - &bundle.commit_description, - )]; - operations.push(ndjson_file_op( + stage_share_file( + &temp_root, &format!("{}/ranking.csv", bundle.dataset_prefix), - &fs::read(&bundle.ranking_path) - .with_context(|| format!("Read {}", bundle.ranking_path.display()))?, - )); - operations.push(ndjson_file_op( + &bundle.ranking_path, + )?; + stage_share_text( + &temp_root, &format!("{}/metadata.json", bundle.dataset_prefix), - bundle.metadata_content.as_bytes(), - )); - operations.push(ndjson_file_op( + &bundle.metadata_content, + )?; + stage_share_text( + &temp_root, &format!("{}/analysis.json", bundle.dataset_prefix), - bundle.analysis_content.as_bytes(), - )); + &bundle.analysis_content, + )?; if let Some(log_path) = bundle.log_path.as_ref() { - operations.push(ndjson_file_op( + stage_share_file( + &temp_root, &format!("{}/run.log", bundle.dataset_prefix), - &fs::read(log_path).with_context(|| format!("Read {}", log_path.display()))?, - )); + log_path, + )?; } - let endpoint = std::env::var("HF_ENDPOINT") - .ok() - .filter(|value| !value.trim().is_empty()) - .unwrap_or_else(|| "https://huggingface.co".to_string()); - let commit_url = format!( - "{}/api/datasets/{}/commit/main", - endpoint.trim_end_matches('/'), - dataset_repo - ); - let body = operations - .into_iter() - .map(|value| serde_json::to_string(&value)) - .collect::, _>>()? - .join("\n") - + "\n"; - println!("⬆️ Opening contribution PR..."); - let response = reqwest::Client::new() - .post(&commit_url) - .bearer_auth(token) - .query(&[("create_pr", "1")]) - .header("Content-Type", "application/x-ndjson") - .body(body) - .send() + let commit = dataset + .upload_folder( + &RepoUploadFolderParams::builder() + .folder_path(temp_root.clone()) + .revision("main".to_string()) + .commit_message(bundle.commit_message.clone()) + .commit_description(bundle.commit_description.clone()) + .create_pr(true) + .build(), + ) .await .map_err(|err| { share_error( - "Dataset contribution request failed", - &format!("POST {}: {}", commit_url, err), + "Dataset contribution failed", + &format!("Upload staged files to {}: {}", dataset_repo, err), ) })?; - if response.status() != StatusCode::OK { - let status = response.status(); - let body = response.text().await.unwrap_or_default(); - return Err(share_error( - "Dataset contribution failed", - &format!("{}: {}", status, body.trim()), - )); - } - let commit: HfCommitResponse = response.json().await.map_err(|err| { - share_error( - "Could not decode Hugging Face response", - &format!("{}", err), - ) - })?; println!("✅ Opened MoE dataset contribution"); - println!(" commit: {}", commit.commit_oid); - println!(" url: {}", commit.commit_url); - if let Some(pr_url) = commit.pull_request_url.as_deref() { + if let Some(commit_oid) = commit.commit_oid.as_deref() { + println!(" commit: {commit_oid}"); + } + if let Some(commit_url) = commit.commit_url.as_deref() { + println!(" url: {commit_url}"); + } + if let Some(pr_url) = commit.pr_url.as_deref() { println!(" pr: {}", pr_url); } Ok(()) } -#[derive(Deserialize)] -struct HfCommitResponse { - #[serde(rename = "commitOid")] - commit_oid: String, - #[serde(rename = "commitUrl")] - commit_url: String, - #[serde(rename = "pullRequestUrl")] - pull_request_url: Option, +fn parse_dataset_repo(dataset_repo: &str) -> Result<(&str, &str)> { + dataset_repo.split_once('/').ok_or_else(|| { + anyhow::anyhow!("Dataset repo must look like `owner/name`, got {dataset_repo}") + }) } -fn ndjson_header(summary: &str, description: &str) -> serde_json::Value { - json!({ - "key": "header", - "value": { - "summary": summary, - "description": description, - } - }) +fn stage_share_text(temp_root: &Path, relative_path: &str, content: &str) -> Result<()> { + let target = temp_root.join(relative_path); + if let Some(parent) = target.parent() { + fs::create_dir_all(parent)?; + } + fs::write(target, content).with_context(|| format!("Write staged {}", relative_path))?; + Ok(()) } -fn ndjson_file_op(path_in_repo: &str, content: &[u8]) -> serde_json::Value { - json!({ - "key": "file", - "value": { - "content": base64::engine::general_purpose::STANDARD.encode(content), - "path": path_in_repo, - "encoding": "base64", +fn stage_share_file(temp_root: &Path, relative_path: &str, source: &Path) -> Result<()> { + let target = temp_root.join(relative_path); + if let Some(parent) = target.parent() { + fs::create_dir_all(parent)?; + } + if target.exists() { + fs::remove_file(&target).with_context(|| format!("Remove staged {}", target.display()))?; + } + match fs::hard_link(source, &target) { + Ok(()) => Ok(()), + Err(_) => { + fs::copy(source, &target) + .with_context(|| format!("Copy {} to {}", source.display(), target.display()))?; + Ok(()) } - }) + } } #[cfg(test)] mod tests { use super::*; - #[test] - fn ndjson_header_uses_expected_shape() { - let value = ndjson_header("summary", "description"); - assert_eq!(value["key"], "header"); - assert_eq!(value["value"]["summary"], "summary"); - assert_eq!(value["value"]["description"], "description"); - } - - #[test] - fn ndjson_file_op_uses_base64_payload() { - let value = ndjson_file_op("path/in/repo.txt", b"hello"); - assert_eq!(value["key"], "file"); - assert_eq!(value["value"]["path"], "path/in/repo.txt"); - assert_eq!(value["value"]["encoding"], "base64"); - assert_eq!(value["value"]["content"], "aGVsbG8="); - } - #[test] fn classify_share_prefix_distinguishes_new_existing_and_partial() { let all = vec![ @@ -627,6 +592,12 @@ mod tests { SharePrefixState::PartiallyPopulated(vec!["a/ranking.csv".to_string()]) ); } + + #[test] + fn parse_dataset_repo_requires_owner_and_name() { + assert!(parse_dataset_repo("meshllm/moe-rankings").is_ok()); + assert!(parse_dataset_repo("invalid").is_err()); + } } fn resolve_analyze_binary() -> Result { From d54099c6777b0513226875fe2864783970b2172a Mon Sep 17 00:00:00 2001 From: James Dumay Date: Sat, 18 Apr 2026 15:17:21 +1000 Subject: [PATCH 2/2] Show progress for MoE share uploads --- mesh-llm/src/cli/commands/moe/mod.rs | 236 ++++++++++++++++++++++++++- 1 file changed, 233 insertions(+), 3 deletions(-) diff --git a/mesh-llm/src/cli/commands/moe/mod.rs b/mesh-llm/src/cli/commands/moe/mod.rs index efb46faf..70a3bcf9 100644 --- a/mesh-llm/src/cli/commands/moe/mod.rs +++ b/mesh-llm/src/cli/commands/moe/mod.rs @@ -4,16 +4,20 @@ mod formatters_json; mod hf_jobs; use anyhow::{bail, Context, Result}; -use hf_hub::RepoUploadFolderParams; +use hf_hub::{ + Progress, ProgressEvent, ProgressHandler, RepoUploadFolderParams, UploadEvent, UploadPhase, +}; use std::collections::BTreeMap; use std::fmt::Write as _; use std::fs; +use std::io::Write as _; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::sync::{Arc, Mutex}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use crate::cli::moe::{HfJobArgs, MoeAnalyzeCommand, MoeCommand}; -use crate::cli::terminal_progress::start_spinner; +use crate::cli::terminal_progress::{clear_stderr_line, start_spinner, SpinnerHandle}; use crate::cli::Cli; use crate::inference::moe; use crate::models; @@ -40,6 +44,210 @@ impl Drop for TempRootGuard { } } +struct MeshUploadProgressState { + total_files: usize, + completed_files: usize, + total_bytes: u64, + bytes_completed: u64, + transfer_bytes: u64, + transfer_bytes_completed: u64, + transfer_bytes_per_sec: Option, + phase: Option, + last_draw: Option, +} + +struct MeshUploadProgress { + spinner: Mutex>, + state: Mutex, +} + +impl MeshUploadProgress { + fn new() -> Self { + Self { + spinner: Mutex::new(Some(start_spinner("Preparing upload"))), + state: Mutex::new(MeshUploadProgressState { + total_files: 0, + completed_files: 0, + total_bytes: 0, + bytes_completed: 0, + transfer_bytes: 0, + transfer_bytes_completed: 0, + transfer_bytes_per_sec: None, + phase: None, + last_draw: None, + }), + } + } + + fn phase_label(phase: Option<&UploadPhase>) -> &'static str { + match phase { + Some(UploadPhase::Preparing) => "Preparing upload", + Some(UploadPhase::CheckingUploadMode) => "Checking upload mode", + Some(UploadPhase::Uploading) => "Uploading files", + Some(UploadPhase::Committing) => "Creating commit", + None => "Uploading", + } + } + + fn should_draw(state: &MeshUploadProgressState) -> bool { + state.transfer_bytes_completed > 0 + || state.transfer_bytes > 0 + || state.bytes_completed > 0 + || state.completed_files > 0 + || matches!( + state.phase, + Some(UploadPhase::Uploading) | Some(UploadPhase::Committing) + ) + } + + fn draw(state: &mut MeshUploadProgressState, force: bool) { + let now = Instant::now(); + if !force + && state.last_draw.is_some_and(|last| { + now.duration_since(last) < std::time::Duration::from_millis(150) + }) + { + return; + } + state.last_draw = Some(now); + + let (completed, total) = if state.transfer_bytes > 0 { + (state.transfer_bytes_completed, state.transfer_bytes) + } else { + (state.bytes_completed, state.total_bytes) + }; + let percent = if total > 0 { + ((completed as f64 / total as f64) * 1000.0).round() as usize + } else if state.total_files > 0 { + ((state.completed_files as f64 / state.total_files as f64) * 1000.0).round() as usize + } else { + 0 + }; + let percent_major = (percent.min(1000)) / 10; + let percent_minor = (percent.min(1000)) % 10; + let file_suffix = if state.total_files > 0 { + format!( + ", {}/{} files", + state.completed_files.min(state.total_files), + state.total_files + ) + } else { + String::new() + }; + let speed_suffix = state + .transfer_bytes_per_sec + .filter(|bytes_per_sec| *bytes_per_sec > 0.0) + .map(|bytes_per_sec| format!(" at {}/s", format_upload_bytes(bytes_per_sec as u64))) + .unwrap_or_default(); + let byte_suffix = if total > 0 { + format!( + " ({}/{})", + format_upload_bytes(completed), + format_upload_bytes(total) + ) + } else { + String::new() + }; + eprint!( + "\r\x1b[K ⏫ {} {:>3}.{:01}%{}{}{}", + Self::phase_label(state.phase.as_ref()), + percent_major, + percent_minor, + byte_suffix, + file_suffix, + speed_suffix, + ); + let _ = std::io::stderr().flush(); + if force { + eprintln!(); + } + } + + fn update(state: &mut MeshUploadProgressState, event: &UploadEvent) { + match event { + UploadEvent::Start { + total_files, + total_bytes, + } => { + state.total_files = *total_files; + state.total_bytes = *total_bytes; + } + UploadEvent::Progress { + phase, + bytes_completed, + total_bytes, + transfer_bytes_completed, + transfer_bytes, + transfer_bytes_per_sec, + .. + } => { + state.phase = Some(phase.clone()); + state.bytes_completed = state.bytes_completed.max(*bytes_completed); + state.total_bytes = state.total_bytes.max(*total_bytes); + state.transfer_bytes_completed = state + .transfer_bytes_completed + .max(*transfer_bytes_completed); + state.transfer_bytes = state.transfer_bytes.max(*transfer_bytes); + state.transfer_bytes_per_sec = *transfer_bytes_per_sec; + } + UploadEvent::FileComplete { files, phase } => { + state.phase = Some(phase.clone()); + state.completed_files = + (state.completed_files + files.len()).min(state.total_files.max(files.len())); + } + UploadEvent::Complete => { + state.phase = Some(UploadPhase::Committing); + if state.transfer_bytes > 0 { + state.transfer_bytes_completed = state.transfer_bytes; + } + if state.total_bytes > 0 { + state.bytes_completed = state.total_bytes; + } + if state.total_files > 0 { + state.completed_files = state.total_files; + } + state.transfer_bytes_per_sec = None; + } + } + } +} + +impl ProgressHandler for MeshUploadProgress { + fn on_progress(&self, event: &ProgressEvent) { + let ProgressEvent::Upload(event) = event else { + return; + }; + let Ok(mut state) = self.state.lock() else { + return; + }; + Self::update(&mut state, event); + let should_draw = Self::should_draw(&state); + let force = matches!(event, UploadEvent::Complete) && should_draw; + if should_draw { + if let Ok(mut spinner) = self.spinner.lock() { + spinner.take(); + } + Self::draw(&mut state, force); + } else if let Ok(mut spinner) = self.spinner.lock() { + if let Some(spinner) = spinner.as_ref() { + spinner.set_message(Self::phase_label(state.phase.as_ref())); + } + if matches!(event, UploadEvent::Complete) { + spinner.take(); + let _ = clear_stderr_line(); + } + } + } +} + +impl Drop for MeshUploadProgress { + fn drop(&mut self) { + if let Ok(mut spinner) = self.spinner.lock() { + spinner.take(); + } + } +} + pub(crate) async fn dispatch_moe_command(command: &MoeCommand, cli: &Cli) -> Result<()> { match command { MoeCommand::Plan { @@ -506,6 +714,8 @@ async fn run_share_resolved( )?; } + let progress_tracker = Arc::new(MeshUploadProgress::new()); + let progress_handler: Progress = Some(progress_tracker); println!("⬆️ Opening contribution PR..."); let commit = dataset .upload_folder( @@ -515,6 +725,7 @@ async fn run_share_resolved( .commit_message(bundle.commit_message.clone()) .commit_description(bundle.commit_description.clone()) .create_pr(true) + .progress(progress_handler) .build(), ) .await @@ -570,6 +781,18 @@ fn stage_share_file(temp_root: &Path, relative_path: &str, source: &Path) -> Res } } +fn format_upload_bytes(bytes: u64) -> String { + if bytes >= 1_000_000_000 { + format!("{:.1}GB", bytes as f64 / 1e9) + } else if bytes >= 1_000_000 { + format!("{:.0}MB", bytes as f64 / 1e6) + } else if bytes >= 1_000 { + format!("{:.0}KB", bytes as f64 / 1e3) + } else { + format!("{bytes}B") + } +} + #[cfg(test)] mod tests { use super::*; @@ -598,6 +821,13 @@ mod tests { assert!(parse_dataset_repo("meshllm/moe-rankings").is_ok()); assert!(parse_dataset_repo("invalid").is_err()); } + + #[test] + fn format_upload_bytes_uses_human_units() { + assert_eq!(format_upload_bytes(999), "999B"); + assert_eq!(format_upload_bytes(1_000), "1KB"); + assert_eq!(format_upload_bytes(1_000_000), "1MB"); + } } fn resolve_analyze_binary() -> Result {