diff --git a/crates/noxa-rag/src/chunker.rs b/crates/noxa-rag/src/chunker.rs index 9aa191b..bea2b48 100644 --- a/crates/noxa-rag/src/chunker.rs +++ b/crates/noxa-rag/src/chunker.rs @@ -6,7 +6,7 @@ use crate::config::ChunkerConfig; use crate::types::Chunk; /// Count whitespace-separated words in a string. -fn word_count(s: &str) -> usize { +pub(crate) fn word_count(s: &str) -> usize { s.split_whitespace().count() } @@ -42,14 +42,6 @@ fn extract_domain(url: &str) -> String { .unwrap_or_default() } -/// Approximate token count — use the tokenizer when possible, fall back to word count. -fn token_estimate(text: &str, tokenizer: &Tokenizer) -> usize { - tokenizer - .encode(text, false) - .map(|enc| enc.len()) - .unwrap_or_else(|_| text.split_whitespace().count()) -} - /// Chunk an `ExtractionResult` into a `Vec`. /// /// - Uses `content.markdown` if non-empty, otherwise `content.plain_text`. @@ -57,6 +49,12 @@ fn token_estimate(text: &str, tokenizer: &Tokenizer) -> usize { /// - Uses `ChunkConfig::with_overlap()` for sliding-window overlap (built into text-splitter ≥0.25). /// - Filters chunks below `config.min_words`. /// - Caps output at `config.max_chunks_per_page`. +/// +/// # Token estimate +/// The `Chunk::token_estimate` field is populated with a word-count approximation. +/// The tokenizer is still used exclusively by the splitter for accurate boundary placement +/// via `ChunkConfig::with_sizer`; re-encoding every emitted chunk would halve throughput +/// on the `spawn_blocking` hot path for no practical gain (the field is diagnostic only). pub fn chunk( result: &ExtractionResult, config: &ChunkerConfig, @@ -112,7 +110,7 @@ pub fn chunk( .into_iter() .enumerate() .map(|(chunk_index, (char_offset, text))| { - let t_est = token_estimate(&text, tokenizer); + let t_est = word_count(&text); let section_header = nearest_heading(&headings, char_offset).map(|s| s.to_string()); Chunk { text, @@ -132,6 +130,57 @@ pub fn chunk( mod tests { use super::*; + fn make_extraction_result(markdown: &str) -> ExtractionResult { + crate::pipeline::parse::make_text_result( + markdown.to_string(), + String::new(), + "https://example.com/test".to_string(), + None, + "test", + crate::chunker::word_count(markdown), + ) + } + + /// Build a minimal whitespace-pretokenized WordLevel tokenizer suitable for + /// unit tests. Every word becomes a distinct token (unk token used for anything + /// not in the small vocab), which is sufficient for splitter boundary logic. + fn make_test_tokenizer() -> Tokenizer { + // A minimal valid tokenizer JSON: WordLevel model with whitespace pre-tokenizer. + // Using from_str avoids the ahash::AHashMap type constraint on WordLevelBuilder::vocab + // and the TokenizerImpl→Tokenizer conversion from TokenizerBuilder. + let json = serde_json::json!({ + "version": "1.0", + "truncation": null, + "padding": null, + "added_tokens": [], + "normalizer": null, + "pre_tokenizer": { + "type": "Whitespace" + }, + "post_processor": null, + "decoder": null, + "model": { + "type": "WordLevel", + "vocab": { + "[UNK]": 0, + "the": 1, + "and": 2, + "of": 3, + "a": 4, + "to": 5, + "in": 6, + "is": 7, + "it": 8, + "that": 9 + }, + "unk_token": "[UNK]" + } + }); + json.to_string() + .parse::() + .expect("test tokenizer from JSON") + } + #[test] fn domain_extraction() { assert_eq!( @@ -148,4 +197,56 @@ mod tests { assert_eq!(word_count(" "), 0); assert_eq!(word_count(""), 0); } + + /// Verify that no double-tokenization occurs: token_estimate is populated with + /// word_count (not from the tokenizer), and chunks are produced for non-trivial input. + #[test] + fn chunk_token_estimate_uses_word_count_not_tokenizer() { + let tokenizer = make_test_tokenizer(); + let config = crate::config::ChunkerConfig { + target_tokens: 50, + overlap_tokens: 0, + min_words: 1, + max_chunks_per_page: 200, + }; + + // A body of text long enough to produce at least one chunk. + let body = (0..200) + .map(|i| format!("word{i}")) + .collect::>() + .join(" "); + let result = make_extraction_result(&body); + let chunks = chunk(&result, &config, &tokenizer); + + assert!( + !chunks.is_empty(), + "expected at least one chunk for a 200-word body" + ); + + for c in &chunks { + // token_estimate must be populated (> 0 for any non-empty chunk). + assert!( + c.token_estimate > 0, + "token_estimate must be > 0, got {}", + c.token_estimate + ); + // token_estimate == word_count of the chunk text (the word_count approximation). + let expected = word_count(&c.text); + assert_eq!( + c.token_estimate, expected, + "token_estimate should equal word_count for chunk at index {}", + c.chunk_index + ); + } + } + + /// Confirm empty content returns no chunks. + #[test] + fn chunk_empty_content_returns_empty() { + let tokenizer = make_test_tokenizer(); + let config = crate::config::ChunkerConfig::default(); + let result = make_extraction_result(""); + let chunks = chunk(&result, &config, &tokenizer); + assert!(chunks.is_empty()); + } } diff --git a/crates/noxa-rag/src/embed/tei.rs b/crates/noxa-rag/src/embed/tei.rs index 7fd9d30..c6b2d2c 100644 --- a/crates/noxa-rag/src/embed/tei.rs +++ b/crates/noxa-rag/src/embed/tei.rs @@ -236,17 +236,28 @@ impl TeiProvider { } if should_retry(status_u16, attempt) { - let body = resp.text().await.unwrap_or_default(); - let preview: String = body.chars().take(512).collect(); - tracing::warn!( - batch = batch_idx + 1, - attempt = attempt + 1, - max_attempts = MAX_RETRIES + 1, - status = status_u16, - delay_ms, - body = preview, - "TEI retry" - ); + if self.auth_token.is_some() { + tracing::warn!( + batch = batch_idx + 1, + attempt = attempt + 1, + max_attempts = MAX_RETRIES + 1, + status = status_u16, + delay_ms, + "TEI retry (body omitted: auth token configured)" + ); + } else { + let body = resp.text().await.unwrap_or_default(); + let preview: String = body.chars().take(512).collect(); + tracing::warn!( + batch = batch_idx + 1, + attempt = attempt + 1, + max_attempts = MAX_RETRIES + 1, + status = status_u16, + delay_ms, + body = preview, + "TEI retry" + ); + } tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; delay_ms = (delay_ms * 2).min(2_000); continue; @@ -318,15 +329,20 @@ impl EmbedProvider for TeiProvider { // Keep EMBED_PIPELINE_DEPTH batches in-flight concurrently so HTTP // round-trip latency overlaps with GPU compute on the TEI server. // buffered() preserves batch ordering. - let batches: Vec<(usize, Vec)> = texts - .chunks(BATCH_SIZE) - .enumerate() - .map(|(i, chunk)| (i, chunk.to_vec())) + // + // We stream (batch_idx, start, end) index tuples and slice `texts` inside + // the async block to avoid cloning the text data into owned Vec batches. + let batch_ranges: Vec<(usize, usize, usize)> = (0..total_batches) + .map(|i| { + let start = i * BATCH_SIZE; + let end = (start + BATCH_SIZE).min(texts.len()); + (i, start, end) + }) .collect(); - let results: Vec>> = futures::stream::iter(batches) - .map(|(batch_idx, batch)| async move { - self.embed_batch_adaptive(&batch, batch_idx, total_batches) + let results: Vec>> = futures::stream::iter(batch_ranges) + .map(|(batch_idx, start, end)| async move { + self.embed_batch_adaptive(&texts[start..end], batch_idx, total_batches) .await }) .buffered(EMBED_PIPELINE_DEPTH) diff --git a/crates/noxa-rag/src/error.rs b/crates/noxa-rag/src/error.rs index eab5bf5..91b1788 100644 --- a/crates/noxa-rag/src/error.rs +++ b/crates/noxa-rag/src/error.rs @@ -32,6 +32,10 @@ pub enum RagError { Json(#[from] serde_json::Error), #[error("parse error: {0}")] Parse(String), + #[error("url validation failed: {0}")] + UrlValidation(String), + #[error("worker panicked: {0}")] + WorkerPanic(String), #[error("error: {0}")] Generic(String), } diff --git a/crates/noxa-rag/src/mcp_bridge/bytestash.rs b/crates/noxa-rag/src/mcp_bridge/bytestash.rs new file mode 100644 index 0000000..084d211 --- /dev/null +++ b/crates/noxa-rag/src/mcp_bridge/bytestash.rs @@ -0,0 +1,105 @@ +use serde_json::Value; + +use crate::RagError; + +use super::{ + BridgeDocument, McpBridge, McpSource, McporterExecutor, SyncReport, WriteStatus, array_field, + io::{build_extraction, write_bridge_document}, + join_base_url, join_non_empty, optional_string, required_base_url, required_string, + string_array, +}; + +impl McpBridge +where + E: McporterExecutor, +{ + pub(super) async fn sync_bytestash(&self) -> Result { + let base_url = required_base_url(&self.config, McpSource::Bytestash)?; + let data = self + .call_data(McpSource::Bytestash, "snippets.list", serde_json::json!({})) + .await?; + let records = if let Some(array) = data.as_array() { + array.iter().collect::>() + } else { + array_field(&data, "snippets")? + }; + + let mut report = SyncReport::default(); + for record in records { + let document = normalize_bytestash_record(record, base_url)?; + report.fetched += 1; + match write_bridge_document(&self.config.watch_dir, &document).await? { + WriteStatus::Written => report.written += 1, + WriteStatus::Unchanged => report.skipped += 1, + } + } + + Ok(report) + } +} + +pub fn normalize_bytestash_record( + record: &Value, + platform_base_url: &str, +) -> Result { + let id = required_string(record, "id")?; + let title = optional_string(record, "title"); + let description = optional_string(record, "description"); + let language = optional_string(record, "language"); + let fragments = record + .get("fragments") + .and_then(Value::as_array) + .ok_or_else(|| RagError::Parse("bytestash record missing fragments array".to_string()))?; + + let mut markdown_parts = Vec::new(); + if let Some(value) = title.as_deref() { + markdown_parts.push(format!("# {value}")); + } + if let Some(value) = description.as_deref() { + markdown_parts.push(value.to_string()); + } + for fragment in fragments { + let file_name = fragment + .get("fileName") + .or_else(|| fragment.get("file_name")) + .and_then(Value::as_str) + .unwrap_or("snippet"); + let code = fragment + .get("code") + .and_then(Value::as_str) + .unwrap_or_default(); + markdown_parts.push(format!( + "## {file_name}\n```{}\n{}\n```", + language.clone().unwrap_or_default(), + code + )); + } + let plain_text = join_non_empty([ + title.clone(), + description.clone(), + Some( + fragments + .iter() + .filter_map(|fragment| fragment.get("code").and_then(Value::as_str)) + .collect::>() + .join("\n\n"), + ), + ]); + let url = join_base_url(platform_base_url, &format!("/api/snippets/{id}"))?; + + Ok(BridgeDocument { + source: McpSource::Bytestash, + external_id: format!("bytestash:{id}"), + platform_url: Some(url.clone()), + extraction: build_extraction( + url, + title, + None, + None, + language, + string_array(record.get("categories")), + markdown_parts.join("\n\n"), + plain_text, + ), + }) +} diff --git a/crates/noxa-rag/src/mcp_bridge/executor.rs b/crates/noxa-rag/src/mcp_bridge/executor.rs new file mode 100644 index 0000000..70f3bec --- /dev/null +++ b/crates/noxa-rag/src/mcp_bridge/executor.rs @@ -0,0 +1,75 @@ +use async_trait::async_trait; +use serde_json::Value; +use tokio::process::Command; + +use crate::RagError; + +use super::{McpSource, McporterExecutor}; + +#[derive(Debug, Clone)] +pub struct ProcessMcporterExecutor { + executable: String, +} + +impl ProcessMcporterExecutor { + pub fn new(executable: impl Into) -> Self { + Self { + executable: executable.into(), + } + } +} + +#[async_trait] +impl McporterExecutor for ProcessMcporterExecutor { + async fn call( + &self, + server: &str, + service: McpSource, + action: &str, + params: Value, + ) -> Result { + let selector = format!("{}.{}", server, service.as_str()); + let args = serde_json::json!({ + "action": action, + "params": params, + }); + const MCPORTER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + + let output = tokio::time::timeout( + MCPORTER_TIMEOUT, + Command::new(&self.executable) + .arg("call") + .arg(&selector) + .arg("--args") + .arg(args.to_string()) + .arg("--output") + .arg("json") + .output(), + ) + .await + .map_err(|_| { + RagError::Generic(format!( + "mcporter call {} {} timed out after {}s", + selector, + action, + MCPORTER_TIMEOUT.as_secs() + )) + })? + .map_err(|e| RagError::Generic(format!("failed to execute mcporter: {e}")))?; + + if !output.status.success() { + // Truncate stderr to avoid leaking arbitrarily large subprocess output. + let stderr = String::from_utf8_lossy(&output.stderr); + let preview: String = stderr.chars().take(512).collect(); + return Err(RagError::Generic(format!( + "mcporter call {} {} failed: {}", + selector, + action, + preview.trim() + ))); + } + + serde_json::from_slice(&output.stdout) + .map_err(|e| RagError::Generic(format!("mcporter returned invalid JSON: {e}"))) + } +} diff --git a/crates/noxa-rag/src/mcp_bridge/io.rs b/crates/noxa-rag/src/mcp_bridge/io.rs new file mode 100644 index 0000000..2bf8ae1 --- /dev/null +++ b/crates/noxa-rag/src/mcp_bridge/io.rs @@ -0,0 +1,131 @@ +use std::path::{Path, PathBuf}; + +use noxa_core::{Content, ExtractionResult, Metadata}; +use serde::{Deserialize, Serialize}; + +use crate::RagError; + +use super::{BridgeDocument, McpSource, WriteStatus}; + +pub fn relative_output_path(source: McpSource, external_id: &str) -> PathBuf { + PathBuf::from("mcp").join(source.as_str()).join(format!( + "{}-{:016x}.json", + sanitize_component(external_id), + stable_component_hash(external_id) + )) +} + +pub async fn write_bridge_document( + root: &Path, + document: &BridgeDocument, +) -> Result { + let path = root.join(relative_output_path(document.source, &document.external_id)); + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + + let payload = StoredExtractionResult { + extraction: document.extraction.clone(), + external_id: Some(document.external_id.clone()), + platform_url: document.platform_url.clone(), + }; + let serialized = serde_json::to_vec_pretty(&payload)?; + if tokio::fs::read(&path).await.ok().as_deref() == Some(serialized.as_slice()) { + return Ok(WriteStatus::Unchanged); + } + + let tmp_path = temp_output_path(&path); + tokio::fs::write(&tmp_path, &serialized).await?; + // Remove destination before rename so the operation succeeds on Windows, + // where rename(src, dst) errors when dst already exists. + let _ = tokio::fs::remove_file(&path).await; + tokio::fs::rename(&tmp_path, &path).await?; + Ok(WriteStatus::Written) +} + +#[allow(clippy::too_many_arguments)] +pub fn build_extraction( + url: String, + title: Option, + published_date: Option, + author: Option, + language: Option, + technologies: Vec, + markdown: String, + plain_text: String, +) -> ExtractionResult { + ExtractionResult { + metadata: Metadata { + title, + description: None, + author, + published_date, + language, + url: Some(url), + site_name: None, + image: None, + favicon: None, + word_count: count_words(&plain_text), + content_hash: None, + source_type: Some("mcp".to_string()), + file_path: None, + last_modified: None, + is_truncated: None, + technologies, + seed_url: None, + crawl_depth: None, + search_query: None, + fetched_at: None, + }, + content: Content { + markdown, + plain_text, + links: Vec::new(), + images: Vec::new(), + code_blocks: Vec::new(), + raw_html: None, + }, + domain_data: None, + vertical_data: None, + structured_data: Vec::new(), + } +} + +fn sanitize_component(value: &str) -> String { + value + .chars() + .map(|ch| match ch { + 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch, + _ => '_', + }) + .collect() +} + +fn stable_component_hash(value: &str) -> u64 { + // DefaultHasher is explicitly documented as not stable across Rust versions. + // Use xxh3_64 (already a workspace dep) for stable, deterministic filenames. + xxhash_rust::xxh3::xxh3_64(value.as_bytes()) +} + +fn temp_output_path(path: &Path) -> PathBuf { + let suffix = format!( + "tmp-{}-{}", + std::process::id(), + uuid::Uuid::new_v4().simple() + ); + path.with_extension(format!("json.{suffix}")) +} + +fn count_words(value: &str) -> usize { + value.split_whitespace().count() +} + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct StoredExtractionResult { + #[serde(flatten)] + pub extraction: ExtractionResult, + #[serde(skip_serializing_if = "Option::is_none")] + pub external_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub platform_url: Option, +} diff --git a/crates/noxa-rag/src/mcp_bridge/linkding.rs b/crates/noxa-rag/src/mcp_bridge/linkding.rs new file mode 100644 index 0000000..27b866c --- /dev/null +++ b/crates/noxa-rag/src/mcp_bridge/linkding.rs @@ -0,0 +1,120 @@ +use std::collections::HashSet; + +use serde_json::Value; +use url::Url; + +use crate::RagError; + +use super::{ + BridgeDocument, McpBridge, McpSource, McporterExecutor, SyncReport, WriteStatus, array_field, + as_u64_value, + io::{build_extraction, write_bridge_document}, + join_non_empty, optional_string, required_string, required_value, string_array, +}; + +impl McpBridge +where + E: McporterExecutor, +{ + pub(super) async fn sync_linkding(&self) -> Result { + let mut report = SyncReport::default(); + let mut offset = 0_u32; + let mut seen_ids = HashSet::new(); + + loop { + let data = self + .call_data( + McpSource::Linkding, + "bookmark.list", + serde_json::json!({ + "limit": self.config.page_size, + "offset": offset, + }), + ) + .await?; + let records = array_field(&data, "results")?; + if records.is_empty() { + break; + } + + let mut new_records = 0_usize; + for record in records { + let document = + normalize_linkding_record(record, self.config.platform_base_url.as_deref())?; + if !seen_ids.insert(document.external_id.clone()) { + continue; + } + new_records += 1; + report.fetched += 1; + match write_bridge_document(&self.config.watch_dir, &document).await? { + WriteStatus::Written => report.written += 1, + WriteStatus::Unchanged => report.skipped += 1, + } + } + + if data.get("next").map_or(true, Value::is_null) { + break; + } + if new_records == 0 { + break; + } + offset = offset.saturating_add(self.config.page_size.max(1)); + } + + Ok(report) + } +} + +pub fn normalize_linkding_record( + record: &Value, + platform_base_url: Option<&str>, +) -> Result { + let id = required_value(record, "id").and_then(as_u64_value)?; + let url = required_string(record, "url")?; + let title = optional_string(record, "title"); + let description = optional_string(record, "description"); + let notes = optional_string(record, "notes"); + let markdown = join_non_empty([ + title.as_deref().map(|value| format!("# {value}")), + description.clone(), + notes.clone(), + ]); + let plain_text = join_non_empty([title.clone(), description, notes]); + let platform_url = match platform_base_url { + Some(base) => Some(linkding_platform_url(base, &url)?), + None => None, + }; + + Ok(BridgeDocument { + source: McpSource::Linkding, + external_id: format!("linkding:{id}"), + platform_url, + extraction: build_extraction( + url, + title, + optional_string(record, "date_added"), + None, + None, + string_array(record.get("tag_names")), + markdown, + plain_text, + ), + }) +} + +fn linkding_platform_url(base: &str, bookmark_url: &str) -> Result { + let mut url = Url::parse(base) + .map_err(|e| RagError::Parse(format!("invalid linkding base URL {base:?}: {e}")))?; + let current_path = url.path().trim_end_matches('/'); + let next_path = if current_path.is_empty() { + "/bookmarks".to_string() + } else { + format!("{current_path}/bookmarks") + }; + url.set_path(&next_path); + let query = url::form_urlencoded::Serializer::new(String::new()) + .append_pair("q", bookmark_url) + .finish(); + url.set_query(Some(&query)); + Ok(url.to_string()) +} diff --git a/crates/noxa-rag/src/mcp_bridge/memos.rs b/crates/noxa-rag/src/mcp_bridge/memos.rs new file mode 100644 index 0000000..1d8e134 --- /dev/null +++ b/crates/noxa-rag/src/mcp_bridge/memos.rs @@ -0,0 +1,86 @@ +use serde_json::Value; + +use crate::RagError; + +use super::{ + BridgeDocument, McpBridge, McpSource, McporterExecutor, SyncReport, WriteStatus, array_field, + first_line_title, + io::{build_extraction, write_bridge_document}, + join_base_url, optional_string, required_base_url, required_string, string_array, +}; + +impl McpBridge +where + E: McporterExecutor, +{ + pub(super) async fn sync_memos(&self) -> Result { + let base_url = required_base_url(&self.config, McpSource::Memos)?; + let mut report = SyncReport::default(); + let mut page_token: Option = None; + + loop { + let mut params = serde_json::json!({ "page_size": self.config.page_size }); + if let Some(token) = &page_token { + params["page_token"] = Value::String(token.clone()); + } + let data = self + .call_data(McpSource::Memos, "memos.list", params) + .await?; + let records = array_field(&data, "memos")?; + if records.is_empty() { + break; + } + + for record in records { + let document = normalize_memo_record(record, base_url)?; + report.fetched += 1; + match write_bridge_document(&self.config.watch_dir, &document).await? { + WriteStatus::Written => report.written += 1, + WriteStatus::Unchanged => report.skipped += 1, + } + } + + let next_page_token = data + .get("nextPageToken") + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(ToOwned::to_owned); + if next_page_token.is_none() || next_page_token == page_token { + break; + } + page_token = next_page_token; + } + + Ok(report) + } +} + +pub fn normalize_memo_record( + record: &Value, + platform_base_url: &str, +) -> Result { + let name = required_string(record, "name")?; + let memo_id = name.strip_prefix("memos/").unwrap_or(&name).to_string(); + let content = required_string(record, "content")?; + let title = first_line_title(&content); + let url = join_base_url(platform_base_url, &format!("/api/v1/{name}"))?; + let published_date = + optional_string(record, "displayTime").or_else(|| optional_string(record, "createTime")); + + Ok(BridgeDocument { + source: McpSource::Memos, + external_id: format!("memos:{memo_id}"), + platform_url: Some(url.clone()), + extraction: build_extraction( + url, + title, + published_date, + None, + None, + string_array(record.get("tags")), + content.clone(), + content, + ), + }) +} diff --git a/crates/noxa-rag/src/mcp_bridge.rs b/crates/noxa-rag/src/mcp_bridge/mod.rs similarity index 59% rename from crates/noxa-rag/src/mcp_bridge.rs rename to crates/noxa-rag/src/mcp_bridge/mod.rs index 8e51e3a..1317425 100644 --- a/crates/noxa-rag/src/mcp_bridge.rs +++ b/crates/noxa-rag/src/mcp_bridge/mod.rs @@ -1,15 +1,25 @@ -use std::collections::{HashMap, HashSet}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use async_trait::async_trait; -use noxa_core::{Content, ExtractionResult, Metadata}; -use serde::{Deserialize, Serialize}; use serde_json::Value; -use tokio::process::Command; use url::Url; use crate::RagError; +mod bytestash; +pub mod executor; +mod io; +mod linkding; +mod memos; +mod paperless; + +pub use bytestash::normalize_bytestash_record; +pub use executor::ProcessMcporterExecutor; +pub use io::{relative_output_path, write_bridge_document}; +pub use linkding::normalize_linkding_record; +pub use memos::normalize_memo_record; +pub use paperless::normalize_paperless_record; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum McpSource { Linkding, @@ -51,7 +61,7 @@ pub struct BridgeDocument { pub source: McpSource, pub external_id: String, pub platform_url: Option, - pub extraction: ExtractionResult, + pub extraction: noxa_core::ExtractionResult, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -71,58 +81,6 @@ pub trait McporterExecutor: Send + Sync { ) -> Result; } -#[derive(Debug, Clone)] -pub struct ProcessMcporterExecutor { - executable: String, -} - -impl ProcessMcporterExecutor { - pub fn new(executable: impl Into) -> Self { - Self { - executable: executable.into(), - } - } -} - -#[async_trait] -impl McporterExecutor for ProcessMcporterExecutor { - async fn call( - &self, - server: &str, - service: McpSource, - action: &str, - params: Value, - ) -> Result { - let selector = format!("{}.{}", server, service.as_str()); - let args = serde_json::json!({ - "action": action, - "params": params, - }); - let output = Command::new(&self.executable) - .arg("call") - .arg(&selector) - .arg("--args") - .arg(args.to_string()) - .arg("--output") - .arg("json") - .output() - .await - .map_err(|e| RagError::Generic(format!("failed to execute mcporter: {e}")))?; - - if !output.status.success() { - return Err(RagError::Generic(format!( - "mcporter call {} {} failed: {}", - selector, - action, - String::from_utf8_lossy(&output.stderr).trim() - ))); - } - - serde_json::from_slice(&output.stdout) - .map_err(|e| RagError::Generic(format!("mcporter returned invalid JSON: {e}"))) - } -} - pub struct McpBridge { executor: E, config: BridgeConfig, @@ -145,163 +103,7 @@ where } } - async fn sync_linkding(&self) -> Result { - let mut report = SyncReport::default(); - let mut offset = 0_u32; - let mut seen_ids = HashSet::new(); - - loop { - let data = self - .call_data( - McpSource::Linkding, - "bookmark.list", - serde_json::json!({ - "limit": self.config.page_size, - "offset": offset, - }), - ) - .await?; - let records = array_field(&data, "results")?; - if records.is_empty() { - break; - } - - let mut new_records = 0_usize; - for record in records { - let document = - normalize_linkding_record(record, self.config.platform_base_url.as_deref())?; - if !seen_ids.insert(document.external_id.clone()) { - continue; - } - new_records += 1; - report.fetched += 1; - match write_bridge_document(&self.config.watch_dir, &document).await? { - WriteStatus::Written => report.written += 1, - WriteStatus::Unchanged => report.skipped += 1, - } - } - - if data.get("next").is_none() || data.get("next").is_some_and(Value::is_null) { - break; - } - if new_records == 0 { - break; - } - offset = offset.saturating_add(self.config.page_size.max(1)); - } - - Ok(report) - } - - async fn sync_memos(&self) -> Result { - let base_url = required_base_url(&self.config, McpSource::Memos)?; - let mut report = SyncReport::default(); - let mut page_token: Option = None; - - loop { - let mut params = serde_json::json!({ "page_size": self.config.page_size }); - if let Some(token) = &page_token { - params["page_token"] = Value::String(token.clone()); - } - let data = self - .call_data(McpSource::Memos, "memos.list", params) - .await?; - let records = array_field(&data, "memos")?; - if records.is_empty() { - break; - } - - for record in records { - let document = normalize_memo_record(record, base_url)?; - report.fetched += 1; - match write_bridge_document(&self.config.watch_dir, &document).await? { - WriteStatus::Written => report.written += 1, - WriteStatus::Unchanged => report.skipped += 1, - } - } - - let next_page_token = data - .get("nextPageToken") - .and_then(Value::as_str) - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(ToOwned::to_owned); - if next_page_token.is_none() || next_page_token == page_token { - break; - } - page_token = next_page_token; - } - - Ok(report) - } - - async fn sync_bytestash(&self) -> Result { - let base_url = required_base_url(&self.config, McpSource::Bytestash)?; - let data = self - .call_data(McpSource::Bytestash, "snippets.list", serde_json::json!({})) - .await?; - let records = if let Some(array) = data.as_array() { - array.iter().collect::>() - } else { - array_field(&data, "snippets")? - }; - - let mut report = SyncReport::default(); - for record in records { - let document = normalize_bytestash_record(record, base_url)?; - report.fetched += 1; - match write_bridge_document(&self.config.watch_dir, &document).await? { - WriteStatus::Written => report.written += 1, - WriteStatus::Unchanged => report.skipped += 1, - } - } - - Ok(report) - } - - async fn sync_paperless(&self) -> Result { - let base_url = required_base_url(&self.config, McpSource::Paperless)?; - let tag_names = self.fetch_paperless_lookup("tags.list").await?; - let correspondent_names = self.fetch_paperless_lookup("correspondents.list").await?; - let mut report = SyncReport::default(); - let mut page = 1_u32; - - loop { - let data = self - .call_data( - McpSource::Paperless, - "documents.list", - serde_json::json!({ - "page_size": self.config.page_size, - "page": page, - }), - ) - .await?; - let records = array_field(&data, "results")?; - if records.is_empty() { - break; - } - - for record in records { - let document = - normalize_paperless_record(record, &tag_names, &correspondent_names, base_url)?; - report.fetched += 1; - match write_bridge_document(&self.config.watch_dir, &document).await? { - WriteStatus::Written => report.written += 1, - WriteStatus::Unchanged => report.skipped += 1, - } - } - - if data.get("next").is_none() || data.get("next").is_some_and(Value::is_null) { - break; - } - page = page.saturating_add(1); - } - - Ok(report) - } - - async fn call_data( + pub(self) async fn call_data( &self, source: McpSource, action: &str, @@ -313,279 +115,13 @@ where .await?; extract_mcporter_data(raw) } - - async fn fetch_paperless_lookup(&self, action: &str) -> Result, RagError> { - let data = self - .call_data(McpSource::Paperless, action, serde_json::json!({})) - .await?; - let items = if let Some(array) = data.as_array() { - array.iter().collect::>() - } else { - array_field(&data, "results")? - }; - let mut lookup = HashMap::new(); - for item in items { - let Some(id) = item.get("id").and_then(as_u64) else { - continue; - }; - let Some(name) = item.get("name").and_then(Value::as_str) else { - continue; - }; - lookup.insert(id, name.to_string()); - } - Ok(lookup) - } -} - -pub fn relative_output_path(source: McpSource, external_id: &str) -> PathBuf { - PathBuf::from("mcp").join(source.as_str()).join(format!( - "{}-{:016x}.json", - sanitize_component(external_id), - stable_component_hash(external_id) - )) } -pub async fn write_bridge_document( - root: &Path, - document: &BridgeDocument, -) -> Result { - let path = root.join(relative_output_path(document.source, &document.external_id)); - if let Some(parent) = path.parent() { - tokio::fs::create_dir_all(parent).await?; - } - - let payload = StoredExtractionResult { - extraction: document.extraction.clone(), - external_id: Some(document.external_id.clone()), - platform_url: document.platform_url.clone(), - }; - let serialized = serde_json::to_vec_pretty(&payload)?; - if tokio::fs::read(&path).await.ok().as_deref() == Some(serialized.as_slice()) { - return Ok(WriteStatus::Unchanged); - } - - let tmp_path = temp_output_path(&path); - tokio::fs::write(&tmp_path, &serialized).await?; - tokio::fs::rename(&tmp_path, &path).await?; - Ok(WriteStatus::Written) -} +// --------------------------------------------------------------------------- +// Shared helper functions (used by platform submodules via `use super::...`) +// --------------------------------------------------------------------------- -pub fn normalize_linkding_record( - record: &Value, - platform_base_url: Option<&str>, -) -> Result { - let id = required_value(record, "id").and_then(as_u64_value)?; - let url = required_string(record, "url")?; - let title = optional_string(record, "title"); - let description = optional_string(record, "description"); - let notes = optional_string(record, "notes"); - let markdown = join_non_empty([ - title.as_deref().map(|value| format!("# {value}")), - description.clone(), - notes.clone(), - ]); - let plain_text = join_non_empty([title.clone(), description, notes]); - let platform_url = match platform_base_url { - Some(base) => Some(linkding_platform_url(base, &url)?), - None => None, - }; - - Ok(BridgeDocument { - source: McpSource::Linkding, - external_id: format!("linkding:{id}"), - platform_url, - extraction: build_extraction( - url, - title, - optional_string(record, "date_added"), - None, - None, - string_array(record.get("tag_names")), - markdown, - plain_text, - ), - }) -} - -pub fn normalize_memo_record( - record: &Value, - platform_base_url: &str, -) -> Result { - let name = required_string(record, "name")?; - let memo_id = name.strip_prefix("memos/").unwrap_or(&name).to_string(); - let content = required_string(record, "content")?; - let title = first_line_title(&content); - let url = join_base_url(platform_base_url, &format!("/api/v1/{name}"))?; - let published_date = - optional_string(record, "displayTime").or_else(|| optional_string(record, "createTime")); - - Ok(BridgeDocument { - source: McpSource::Memos, - external_id: format!("memos:{memo_id}"), - platform_url: Some(url.clone()), - extraction: build_extraction( - url, - title, - published_date, - None, - None, - string_array(record.get("tags")), - content.clone(), - content, - ), - }) -} - -pub fn normalize_bytestash_record( - record: &Value, - platform_base_url: &str, -) -> Result { - let id = required_string(record, "id")?; - let title = optional_string(record, "title"); - let description = optional_string(record, "description"); - let language = optional_string(record, "language"); - let fragments = record - .get("fragments") - .and_then(Value::as_array) - .ok_or_else(|| RagError::Parse("bytestash record missing fragments array".to_string()))?; - - let mut markdown_parts = Vec::new(); - if let Some(value) = title.as_deref() { - markdown_parts.push(format!("# {value}")); - } - if let Some(value) = description.as_deref() { - markdown_parts.push(value.to_string()); - } - for fragment in fragments { - let file_name = fragment - .get("fileName") - .or_else(|| fragment.get("file_name")) - .and_then(Value::as_str) - .unwrap_or("snippet"); - let code = fragment - .get("code") - .and_then(Value::as_str) - .unwrap_or_default(); - markdown_parts.push(format!( - "## {file_name}\n```{}\n{}\n```", - language.clone().unwrap_or_default(), - code - )); - } - let plain_text = join_non_empty([ - title.clone(), - description.clone(), - Some( - fragments - .iter() - .filter_map(|fragment| fragment.get("code").and_then(Value::as_str)) - .collect::>() - .join("\n\n"), - ), - ]); - let url = join_base_url(platform_base_url, &format!("/api/snippets/{id}"))?; - - Ok(BridgeDocument { - source: McpSource::Bytestash, - external_id: format!("bytestash:{id}"), - platform_url: Some(url.clone()), - extraction: build_extraction( - url, - title, - None, - None, - language, - string_array(record.get("categories")), - markdown_parts.join("\n\n"), - plain_text, - ), - }) -} - -pub fn normalize_paperless_record( - record: &Value, - tag_names: &std::collections::HashMap, - correspondent_names: &std::collections::HashMap, - platform_base_url: &str, -) -> Result { - let id = required_value(record, "id").and_then(as_u64_value)?; - let tags = record - .get("tags") - .and_then(Value::as_array) - .map(|items| { - items - .iter() - .filter_map(as_u64) - .filter_map(|value| tag_names.get(&value).cloned()) - .collect::>() - }) - .unwrap_or_default(); - let author = record - .get("correspondent") - .and_then(as_u64) - .and_then(|value| correspondent_names.get(&value).cloned()); - let title = optional_string(record, "title"); - let content = optional_string(record, "content").unwrap_or_default(); - let url = join_base_url(platform_base_url, &format!("/api/documents/{id}/"))?; - - Ok(BridgeDocument { - source: McpSource::Paperless, - external_id: format!("paperless:{id}"), - platform_url: Some(url.clone()), - extraction: build_extraction( - url, - title, - optional_string(record, "created").or_else(|| optional_string(record, "created_date")), - author, - None, - tags, - content.clone(), - content, - ), - }) -} - -fn sanitize_component(value: &str) -> String { - value - .chars() - .map(|ch| match ch { - 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' => ch, - _ => '_', - }) - .collect() -} - -fn stable_component_hash(value: &str) -> u64 { - use std::hash::{DefaultHasher, Hasher}; - let mut hasher = DefaultHasher::new(); - hasher.write(value.as_bytes()); - hasher.finish() -} - -fn temp_output_path(path: &Path) -> PathBuf { - let suffix = format!( - "tmp-{}-{}", - std::process::id(), - uuid::Uuid::new_v4().simple() - ); - path.with_extension(format!("json.{suffix}")) -} - -#[derive(Debug, Serialize, Deserialize)] -struct StoredExtractionResult { - #[serde(flatten)] - extraction: ExtractionResult, - #[serde(skip_serializing_if = "Option::is_none")] - external_id: Option, - #[serde(skip_serializing_if = "Option::is_none")] - platform_url: Option, -} - -fn count_words(value: &str) -> usize { - value.split_whitespace().count() -} - -fn extract_mcporter_data(raw: Value) -> Result { +pub(super) fn extract_mcporter_data(raw: Value) -> Result { let ok = raw.get("ok").and_then(Value::as_bool).unwrap_or(true); if ok { raw.get("data") @@ -601,7 +137,7 @@ fn extract_mcporter_data(raw: Value) -> Result { } } -fn array_field<'a>(value: &'a Value, key: &str) -> Result, RagError> { +pub(super) fn array_field<'a>(value: &'a Value, key: &str) -> Result, RagError> { value .get(key) .and_then(Value::as_array) @@ -609,13 +145,16 @@ fn array_field<'a>(value: &'a Value, key: &str) -> Result, RagErr .ok_or_else(|| RagError::Parse(format!("expected array field {key}"))) } -fn required_base_url(config: &BridgeConfig, source: McpSource) -> Result<&str, RagError> { +pub(super) fn required_base_url( + config: &BridgeConfig, + source: McpSource, +) -> Result<&str, RagError> { config.platform_base_url.as_deref().ok_or_else(|| { RagError::Config(format!("{} requires --platform-base-url", source.as_str())) }) } -fn join_base_url(base: &str, path: &str) -> Result { +pub(super) fn join_base_url(base: &str, path: &str) -> Result { let base = base.trim_end_matches('/'); let url = format!("{base}{path}"); Url::parse(&url) @@ -623,30 +162,13 @@ fn join_base_url(base: &str, path: &str) -> Result { .map_err(|e| RagError::Parse(format!("invalid base URL {base:?}: {e}"))) } -fn linkding_platform_url(base: &str, bookmark_url: &str) -> Result { - let mut url = Url::parse(base) - .map_err(|e| RagError::Parse(format!("invalid linkding base URL {base:?}: {e}")))?; - let current_path = url.path().trim_end_matches('/'); - let next_path = if current_path.is_empty() { - "/bookmarks".to_string() - } else { - format!("{current_path}/bookmarks") - }; - url.set_path(&next_path); - let query = url::form_urlencoded::Serializer::new(String::new()) - .append_pair("q", bookmark_url) - .finish(); - url.set_query(Some(&query)); - Ok(url.to_string()) -} - -fn required_value<'a>(value: &'a Value, key: &str) -> Result<&'a Value, RagError> { +pub(super) fn required_value<'a>(value: &'a Value, key: &str) -> Result<&'a Value, RagError> { value .get(key) .ok_or_else(|| RagError::Parse(format!("missing required field {key}"))) } -fn required_string(value: &Value, key: &str) -> Result { +pub(super) fn required_string(value: &Value, key: &str) -> Result { value .get(key) .and_then(Value::as_str) @@ -654,24 +176,24 @@ fn required_string(value: &Value, key: &str) -> Result { .ok_or_else(|| RagError::Parse(format!("missing required string field {key}"))) } -fn optional_string(value: &Value, key: &str) -> Option { +pub(super) fn optional_string(value: &Value, key: &str) -> Option { value .get(key) .and_then(Value::as_str) .map(ToOwned::to_owned) } -fn as_u64(value: &Value) -> Option { +pub(super) fn as_u64(value: &Value) -> Option { value .as_u64() .or_else(|| value.as_str().and_then(|raw| raw.parse::().ok())) } -fn as_u64_value(value: &Value) -> Result { +pub(super) fn as_u64_value(value: &Value) -> Result { as_u64(value).ok_or_else(|| RagError::Parse("expected integer id".to_string())) } -fn string_array(value: Option<&Value>) -> Vec { +pub(super) fn string_array(value: Option<&Value>) -> Vec { value .and_then(Value::as_array) .map(|items| { @@ -689,7 +211,7 @@ fn string_array(value: Option<&Value>) -> Vec { .unwrap_or_default() } -fn join_non_empty(parts: I) -> String +pub(super) fn join_non_empty(parts: I) -> String where I: IntoIterator>, { @@ -702,7 +224,7 @@ where .join("\n\n") } -fn first_line_title(content: &str) -> Option { +pub(super) fn first_line_title(content: &str) -> Option { content .lines() .map(str::trim) @@ -710,54 +232,6 @@ fn first_line_title(content: &str) -> Option { .map(|line| line.chars().take(80).collect::()) } -#[allow(clippy::too_many_arguments)] -fn build_extraction( - url: String, - title: Option, - published_date: Option, - author: Option, - language: Option, - technologies: Vec, - markdown: String, - plain_text: String, -) -> ExtractionResult { - ExtractionResult { - metadata: Metadata { - title, - description: None, - author, - published_date, - language, - url: Some(url), - site_name: None, - image: None, - favicon: None, - word_count: count_words(&plain_text), - content_hash: None, - source_type: Some("mcp".to_string()), - file_path: None, - last_modified: None, - is_truncated: None, - technologies, - seed_url: None, - crawl_depth: None, - search_query: None, - fetched_at: None, - }, - content: Content { - markdown, - plain_text, - links: Vec::new(), - images: Vec::new(), - code_blocks: Vec::new(), - raw_html: None, - }, - domain_data: None, - vertical_data: None, - structured_data: Vec::new(), - } -} - #[cfg(test)] mod tests { use std::collections::{HashMap, VecDeque}; @@ -997,7 +471,7 @@ mod tests { platform_url: Some( "https://ding.tootie.tv/bookmarks?q=https%3A%2F%2Fpipenet.dev%2F".to_string(), ), - extraction: build_extraction( + extraction: io::build_extraction( "https://pipenet.dev/".to_string(), Some("pipenet".to_string()), Some("2026-02-02T15:23:27.821564-05:00".to_string()), diff --git a/crates/noxa-rag/src/mcp_bridge/paperless.rs b/crates/noxa-rag/src/mcp_bridge/paperless.rs new file mode 100644 index 0000000..1f60113 --- /dev/null +++ b/crates/noxa-rag/src/mcp_bridge/paperless.rs @@ -0,0 +1,127 @@ +use std::collections::HashMap; + +use serde_json::Value; + +use crate::RagError; + +use super::{ + BridgeDocument, McpBridge, McpSource, McporterExecutor, SyncReport, WriteStatus, array_field, + as_u64, as_u64_value, + io::{build_extraction, write_bridge_document}, + join_base_url, optional_string, required_base_url, required_value, +}; + +impl McpBridge +where + E: McporterExecutor, +{ + pub(super) async fn sync_paperless(&self) -> Result { + let base_url = required_base_url(&self.config, McpSource::Paperless)?; + let tag_names = self.fetch_paperless_lookup("tags.list").await?; + let correspondent_names = self.fetch_paperless_lookup("correspondents.list").await?; + let mut report = SyncReport::default(); + let mut page = 1_u32; + + loop { + let data = self + .call_data( + McpSource::Paperless, + "documents.list", + serde_json::json!({ + "page_size": self.config.page_size, + "page": page, + }), + ) + .await?; + let records = array_field(&data, "results")?; + if records.is_empty() { + break; + } + + for record in records { + let document = + normalize_paperless_record(record, &tag_names, &correspondent_names, base_url)?; + report.fetched += 1; + match write_bridge_document(&self.config.watch_dir, &document).await? { + WriteStatus::Written => report.written += 1, + WriteStatus::Unchanged => report.skipped += 1, + } + } + + if data.get("next").is_none() || data.get("next").is_some_and(Value::is_null) { + break; + } + page = page.saturating_add(1); + } + + Ok(report) + } + + pub(super) async fn fetch_paperless_lookup( + &self, + action: &str, + ) -> Result, RagError> { + let data = self + .call_data(McpSource::Paperless, action, serde_json::json!({})) + .await?; + let items = if let Some(array) = data.as_array() { + array.iter().collect::>() + } else { + array_field(&data, "results")? + }; + let mut lookup = HashMap::new(); + for item in items { + let Some(id) = item.get("id").and_then(as_u64) else { + continue; + }; + let Some(name) = item.get("name").and_then(Value::as_str) else { + continue; + }; + lookup.insert(id, name.to_string()); + } + Ok(lookup) + } +} + +pub fn normalize_paperless_record( + record: &Value, + tag_names: &HashMap, + correspondent_names: &HashMap, + platform_base_url: &str, +) -> Result { + let id = required_value(record, "id").and_then(as_u64_value)?; + let tags = record + .get("tags") + .and_then(Value::as_array) + .map(|items| { + items + .iter() + .filter_map(as_u64) + .filter_map(|value| tag_names.get(&value).cloned()) + .collect::>() + }) + .unwrap_or_default(); + let author = record + .get("correspondent") + .and_then(as_u64) + .and_then(|value| correspondent_names.get(&value).cloned()); + let title = optional_string(record, "title"); + let content = optional_string(record, "content").unwrap_or_default(); + let url = join_base_url(platform_base_url, &format!("/api/documents/{id}/"))?; + + Ok(BridgeDocument { + source: McpSource::Paperless, + external_id: format!("paperless:{id}"), + platform_url: Some(url.clone()), + extraction: build_extraction( + url, + title, + optional_string(record, "created").or_else(|| optional_string(record, "created_date")), + author, + None, + tags, + content.clone(), + content, + ), + }) +} diff --git a/crates/noxa-rag/src/pipeline.rs b/crates/noxa-rag/src/pipeline.rs index a696fee..f1f30f6 100644 --- a/crates/noxa-rag/src/pipeline.rs +++ b/crates/noxa-rag/src/pipeline.rs @@ -11,7 +11,7 @@ use crate::error::RagError; use crate::store::DynVectorStore; mod heartbeat; -mod parse; +pub(crate) mod parse; mod process; mod runtime; mod scan; @@ -24,6 +24,7 @@ struct CounterSnapshot { failed: usize, parse_failures: usize, total_chunks: usize, + total_io_ms: u64, total_parse_ms: u64, total_chunk_ms: u64, total_embed_ms: u64, @@ -38,6 +39,9 @@ struct SessionCounters { /// broader process errors so the heartbeat can report them independently. parse_failures: std::sync::atomic::AtomicUsize, total_chunks: std::sync::atomic::AtomicUsize, + /// Total time spent on file I/O (canonicalize, open, read) across all jobs. + total_io_ms: std::sync::atomic::AtomicU64, + /// Total time spent in the parser (CPU-bound format decoding) across all jobs. total_parse_ms: std::sync::atomic::AtomicU64, total_chunk_ms: std::sync::atomic::AtomicU64, total_embed_ms: std::sync::atomic::AtomicU64, @@ -52,6 +56,7 @@ impl SessionCounters { failed: self.files_failed.load(Relaxed), parse_failures: self.parse_failures.load(Relaxed), total_chunks: self.total_chunks.load(Relaxed), + total_io_ms: self.total_io_ms.load(Relaxed), total_parse_ms: self.total_parse_ms.load(Relaxed), total_chunk_ms: self.total_chunk_ms.load(Relaxed), total_embed_ms: self.total_embed_ms.load(Relaxed), @@ -65,6 +70,7 @@ impl SessionCounters { self.files_indexed.fetch_add(1, Relaxed); } self.total_chunks.fetch_add(stats.chunks, Relaxed); + self.total_io_ms.fetch_add(stats.io_ms, Relaxed); self.total_parse_ms.fetch_add(stats.parse_ms, Relaxed); self.total_chunk_ms.fetch_add(stats.chunk_ms, Relaxed); self.total_embed_ms.fetch_add(stats.embed_ms, Relaxed); @@ -75,6 +81,11 @@ impl SessionCounters { self.files_failed .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } + + fn record_parse_failure(&self) { + self.parse_failures + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } } struct IndexJob { @@ -106,6 +117,9 @@ enum PipelineJob { #[derive(Default)] struct JobStats { chunks: usize, + /// Time spent on file I/O (canonicalize, open, read) — does not include parse CPU time. + io_ms: u64, + /// Time spent in the parser (CPU-bound format decoding) — does not include file I/O. parse_ms: u64, chunk_ms: u64, embed_ms: u64, @@ -122,7 +136,7 @@ struct WorkerContext { embed: DynEmbedProvider, store: DynVectorStore, tokenizer: Arc, - config: RagConfig, + config: Arc, url_locks: Arc>>>, git_branch_cache: Arc>>, watch_roots: Arc>, @@ -132,7 +146,7 @@ struct WorkerContext { } impl WorkerContext { - fn from_pipeline(pipeline: &Pipeline) -> Self { + fn from_pipeline(pipeline: &Pipeline, watch_roots: Arc>) -> Self { Self { embed: pipeline.embed.clone(), store: pipeline.store.clone(), @@ -140,11 +154,7 @@ impl WorkerContext { config: pipeline.config.clone(), url_locks: pipeline.url_locks.clone(), git_branch_cache: pipeline.git_branch_cache.clone(), - watch_roots: pipeline - .watch_roots - .get() - .expect("watch_roots set before spawn_workers") - .clone(), + watch_roots, counters: pipeline.counters.clone(), failed_jobs_log_lock: pipeline.failed_jobs_log_lock.clone(), shutdown: pipeline.shutdown.clone(), @@ -153,7 +163,7 @@ impl WorkerContext { } pub struct Pipeline { - config: RagConfig, + config: Arc, embed: DynEmbedProvider, store: DynVectorStore, tokenizer: Arc, @@ -169,9 +179,6 @@ pub struct Pipeline { /// Serialises failed-jobs log rotation: check-size → rotate → append must be atomic /// across concurrent workers to avoid double-rename races. failed_jobs_log_lock: Arc>, - /// Canonicalized watch roots, set once during `run()` before workers are spawned. - /// Workers access this directly instead of receiving it as a spawn parameter. - watch_roots: std::sync::OnceLock>>, } impl Pipeline { @@ -183,7 +190,7 @@ impl Pipeline { shutdown: CancellationToken, ) -> Self { Self { - config, + config: Arc::new(config), embed, store, tokenizer, @@ -192,7 +199,6 @@ impl Pipeline { git_branch_cache: Arc::new(DashMap::new()), counters: Arc::new(SessionCounters::default()), failed_jobs_log_lock: Arc::new(tokio::sync::Mutex::new(())), - watch_roots: std::sync::OnceLock::new(), } } diff --git a/crates/noxa-rag/src/pipeline/heartbeat.rs b/crates/noxa-rag/src/pipeline/heartbeat.rs index 99bc953..7113b0b 100644 --- a/crates/noxa-rag/src/pipeline/heartbeat.rs +++ b/crates/noxa-rag/src/pipeline/heartbeat.rs @@ -41,6 +41,7 @@ pub(super) fn spawn_heartbeat( indexed = snap.indexed, failed = snap.failed, parse_failures = snap.parse_failures, + io_ms = snap.total_io_ms, parse_ms = snap.total_parse_ms, chunk_ms = snap.total_chunk_ms, embed_ms = snap.total_embed_ms, diff --git a/crates/noxa-rag/src/pipeline/parse/binary.rs b/crates/noxa-rag/src/pipeline/parse/binary.rs index 10f6df6..55e7a6e 100644 --- a/crates/noxa-rag/src/pipeline/parse/binary.rs +++ b/crates/noxa-rag/src/pipeline/parse/binary.rs @@ -12,7 +12,7 @@ pub(crate) fn parse_pdf_file( let result = noxa_pdf::extract_pdf(bytes, noxa_pdf::PdfMode::Auto) .map_err(|e| RagError::Parse(format!("PDF extract: {e}")))?; let text = noxa_pdf::to_markdown(&result); - let word_count = text.split_whitespace().count(); + let word_count = crate::chunker::word_count(&text); Ok(ParsedFile { extraction: make_text_result(text.clone(), text, url, Some(title), "file", word_count), provenance: IngestionProvenance::default(), @@ -32,6 +32,10 @@ pub(crate) fn parse_office_zip_file( // stream (NOT the attacker-controlled central directory size field). This defends // against zip bombs that lie about entry.size() in the central directory. const MAX_DOCX_EXTRACTED_BYTES: u64 = 50 * 1024 * 1024; + // Per-entry and cumulative measured caps for ODT/PPTX. + // Named separately from the DOCX constants so each format's limit is explicit. + const MAX_ODT_PPTX_PER_ENTRY_BYTES: u64 = 10 * 1024 * 1024; // 10 MiB per XML entry + const MAX_ODT_PPTX_TOTAL_BYTES: u64 = 50 * 1024 * 1024; // 50 MiB cumulative let cursor = std::io::Cursor::new(bytes); let mut archive = zip::ZipArchive::new(cursor) @@ -117,17 +121,23 @@ pub(crate) fn parse_office_zip_file( let mut text_parts: Vec = Vec::new(); let mut slide_count = 0u32; let mut has_notes = false; + // Authoritative measured total for ODT/PPTX decompressed bytes. + // This is incremented from the actual read count, NOT the advisory entry.size(). + let mut odt_pptx_measured_total: u64 = 0; for i in 0..archive.len() { let mut entry = archive .by_index(i) .map_err(|e| RagError::Parse(format!("{ext} entry {i}: {e}")))?; + + // Advisory pre-checks using the central directory declared size. + // These are fast-path guards only — the central directory is attacker-controlled + // so they must be backed up by the measured read below. total_uncompressed_size = total_uncompressed_size.saturating_add(entry.size()); if total_uncompressed_size > MAX_TOTAL_UNCOMPRESSED_SIZE { return Err(RagError::Parse(format!( "{ext}: archive expands to more than 250 MiB — possible zip bomb" ))); } - if entry.size() > MAX_ENTRY_SIZE { return Err(RagError::Parse(format!( "{ext}: entry '{}' decompresses to {} bytes (max 100 MiB) — possible zip bomb", @@ -150,10 +160,37 @@ pub(crate) fn parse_office_zip_file( continue; } + // Authoritative measured guard: cap the actual decompression stream. + // A crafted file can declare a tiny entry.size() in the central directory + // while the actual decompressed content expands to gigabytes. We read at most + // (cap + 1) bytes so we can distinguish "exactly at budget" from "overran". + let remaining = MAX_ODT_PPTX_TOTAL_BYTES.saturating_sub(odt_pptx_measured_total); + let per_entry_cap = MAX_ODT_PPTX_PER_ENTRY_BYTES.min(remaining); + let read_cap = per_entry_cap.saturating_add(1); let mut xml_buf = String::new(); - entry + (&mut entry) + .take(read_cap) .read_to_string(&mut xml_buf) .map_err(|e| RagError::Parse(format!("{ext} read '{name}': {e}")))?; + let read_bytes = xml_buf.len() as u64; + if read_bytes > per_entry_cap { + // Distinguish which cap fired: per-entry or cumulative. + let reason = if per_entry_cap < MAX_ODT_PPTX_PER_ENTRY_BYTES { + format!( + "cumulative budget ({} MiB total) reached", + MAX_ODT_PPTX_TOTAL_BYTES / (1024 * 1024) + ) + } else { + format!( + "per-entry limit ({} MiB) exceeded", + MAX_ODT_PPTX_PER_ENTRY_BYTES / (1024 * 1024) + ) + }; + return Err(RagError::Parse(format!( + "{ext}: entry '{name}' — {reason} — possible zip bomb" + ))); + } + odt_pptx_measured_total = odt_pptx_measured_total.saturating_add(read_bytes); let fragment = super::extract_xml_text(&xml_buf).unwrap_or_else(|_| xml_buf.clone()); if !fragment.trim().is_empty() { @@ -162,7 +199,7 @@ pub(crate) fn parse_office_zip_file( } let text = text_parts.join("\n\n"); - let word_count = text.split_whitespace().count(); + let word_count = crate::chunker::word_count(&text); let extraction = make_text_result(text.clone(), text, url, Some(title), "file", word_count); let provenance = if ext == "pptx" { IngestionProvenance { @@ -181,3 +218,159 @@ pub(crate) fn parse_office_zip_file( provenance, }) } + +#[cfg(test)] +mod tests { + use super::*; + + /// Build an in-memory zip archive with a single entry. + /// `entry_name`: path inside the zip (e.g. "content.xml") + /// `content`: raw bytes to store (Deflated, which compresses repeated data heavily) + fn make_zip_with_entry(entry_name: &str, content: &[u8]) -> Vec { + use std::io::Write; + let buf = std::io::Cursor::new(Vec::new()); + let mut writer = zip::ZipWriter::new(buf); + let options = zip::write::SimpleFileOptions::default() + .compression_method(zip::CompressionMethod::Deflated); + writer.start_file(entry_name, options).expect("start_file"); + writer.write_all(content).expect("write_all"); + writer.finish().expect("finish").into_inner() + } + + /// Verify that the DOCX guard is untouched: the existing DOCX path compiles and + /// the entry-count bomb check fires correctly (the measured guard is tested by DOCX's + /// own test suite; we just ensure structural integrity here). + #[test] + fn docx_entry_count_bomb_rejected() { + // Build a zip with MAX_ENTRIES+1 (1001) empty entries as a DOCX. + use std::io::Write; + let buf = std::io::Cursor::new(Vec::new()); + let mut writer = zip::ZipWriter::new(buf); + let options = zip::write::SimpleFileOptions::default() + .compression_method(zip::CompressionMethod::Stored); + for i in 0..1001usize { + writer + .start_file(format!("file{i}.txt"), options) + .expect("start_file"); + writer.write_all(b"").expect("write_all"); + } + let bytes = writer.finish().expect("finish").into_inner(); + + let result = parse_office_zip_file( + &bytes, + "file:///test.docx".to_string(), + "test".to_string(), + "docx", + ); + assert!( + result.is_err(), + "expected error for zip with 1001 entries, got Ok" + ); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("zip bomb") || msg.contains("entries"), + "unexpected error: {msg}" + ); + } + + /// Core regression for noxa-5gf: an ODT file whose single content.xml entry + /// expands to more than MAX_ODT_PPTX_PER_ENTRY_BYTES (10 MiB) must be rejected + /// via the *measured* decompression guard — even if the advisory entry.size() + /// value in the central directory declares a small size. + /// + /// We use a highly compressible XML payload (11 MiB of repeated ASCII) so the + /// in-memory zip is only ~40 KiB but decompresses beyond the 10 MiB cap. + #[test] + fn odt_decompression_bomb_rejected_by_measured_guard() { + // 11 MiB of valid-ish XML content — highly compressible. + const BOMB_SIZE: usize = 11 * 1024 * 1024; + let xml_content: String = std::iter::repeat("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n") + .flat_map(|s| s.chars()) + .take(BOMB_SIZE) + .collect(); + + // The zip crate writes the actual uncompressed size into the central directory, + // so entry.size() will be 11 MiB. The advisory check (MAX_ENTRY_SIZE = 100 MiB) + // would NOT fire here — only the measured per-entry cap (10 MiB) fires. + let zip_bytes = make_zip_with_entry("content.xml", xml_content.as_bytes()); + + let result = parse_office_zip_file( + &zip_bytes, + "file:///test.odt".to_string(), + "Test Document".to_string(), + "odt", + ); + + assert!( + result.is_err(), + "expected Err for ODT entry exceeding 10 MiB measured cap, got Ok" + ); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("zip bomb") || msg.contains("decompression limit"), + "expected zip bomb error message, got: {msg}" + ); + } + + /// Verify the cumulative measured cap fires when multiple entries each stay + /// under the per-entry cap but together exceed MAX_ODT_PPTX_TOTAL_BYTES (50 MiB). + #[test] + fn odt_cumulative_decompression_bomb_rejected() { + // 6 entries × 9 MiB each = 54 MiB total > 50 MiB limit. + // Each 9 MiB entry is under the 10 MiB per-entry cap, but cumulative > 50 MiB. + const ENTRIES: usize = 6; + const ENTRY_SIZE: usize = 9 * 1024 * 1024; + + use std::io::Write; + let buf = std::io::Cursor::new(Vec::new()); + let mut writer = zip::ZipWriter::new(buf); + let options = zip::write::SimpleFileOptions::default() + .compression_method(zip::CompressionMethod::Deflated); + + let xml_chunk: String = std::iter::repeat("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n") + .flat_map(|s| s.chars()) + .take(ENTRY_SIZE) + .collect(); + + for i in 0..ENTRIES { + // All entries contain "content" in the name to pass the target_prefix filter. + writer + .start_file(format!("content{i}.xml"), options) + .expect("start_file"); + writer.write_all(xml_chunk.as_bytes()).expect("write_all"); + } + let zip_bytes = writer.finish().expect("finish").into_inner(); + + let result = parse_office_zip_file( + &zip_bytes, + "file:///test.odt".to_string(), + "Test Document".to_string(), + "odt", + ); + + assert!( + result.is_err(), + "expected Err for cumulative ODT entries exceeding 50 MiB measured cap, got Ok" + ); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("zip bomb") || msg.contains("decompression limit"), + "expected zip bomb error message, got: {msg}" + ); + } + + /// A legitimate small ODT must parse successfully. + #[test] + fn odt_small_legitimate_file_parses_ok() { + let xml_content = r#"Hello world"#; + let zip_bytes = make_zip_with_entry("content.xml", xml_content.as_bytes()); + + let result = parse_office_zip_file( + &zip_bytes, + "file:///test.odt".to_string(), + "Test".to_string(), + "odt", + ); + assert!(result.is_ok(), "expected Ok for small ODT, got: {result:?}"); + } +} diff --git a/crates/noxa-rag/src/pipeline/parse/mod.rs b/crates/noxa-rag/src/pipeline/parse/mod.rs index 6bf3488..7437582 100644 --- a/crates/noxa-rag/src/pipeline/parse/mod.rs +++ b/crates/noxa-rag/src/pipeline/parse/mod.rs @@ -3,7 +3,7 @@ use std::path::Path; use noxa_core::types::ExtractionResult; use crate::error::RagError; -use crate::types::PointPayload; +use crate::types::{Chunk, PointPayload}; mod binary; mod rich; @@ -61,6 +61,54 @@ pub(crate) enum FormatProvenance { Generic, } +impl FormatProvenance { + /// Apply format-specific fields onto `payload`. + /// + /// Web-variant web fields (seed_url, search_query, crawl_depth) are + /// pre-resolved in `FileMetadata::from_result_and_provenance` (variant + /// Some takes precedence over metadata fallback). This method therefore + /// only needs to handle non-web format variants. + pub(crate) fn apply(&self, payload: &mut PointPayload) { + match self { + FormatProvenance::Web { .. } => { + // Already resolved into FileMetadata — nothing to do here. + } + FormatProvenance::Email { + to, + message_id, + thread_id, + has_attachments, + } => { + payload.email_to = to.clone(); + payload.email_message_id = message_id.clone(); + payload.email_thread_id = thread_id.clone(); + payload.email_has_attachments = *has_attachments; + } + FormatProvenance::Feed { feed_url, item_id } => { + payload.feed_url = feed_url.clone(); + payload.feed_item_id = item_id.clone(); + } + FormatProvenance::Subtitle { + start_s, + end_s, + source_file, + } => { + payload.subtitle_start_s = *start_s; + payload.subtitle_end_s = *end_s; + payload.subtitle_source_file = source_file.clone(); + } + FormatProvenance::Presentation { + slide_count, + has_notes, + } => { + payload.pptx_slide_count = *slide_count; + payload.pptx_has_notes = *has_notes; + } + FormatProvenance::Generic => {} + } + } +} + #[derive(Debug, Clone, Default)] pub(crate) struct IngestionProvenance { pub external_id: Option, @@ -68,6 +116,90 @@ pub(crate) struct IngestionProvenance { pub format: FormatProvenance, } +/// Per-file metadata that is identical for every chunk produced from the same +/// document. Built once before the chunk loop via +/// `FileMetadata::from_result_and_provenance` so the file-level fields are +/// cloned once rather than once per chunk. +/// +/// `file_hash` is NOT included here because it is computed separately in +/// `process.rs` and passed directly to `build_point_payload`. +pub(crate) struct FileMetadata { + pub title: Option, + pub author: Option, + pub published_date: Option, + pub language: Option, + pub source_type: Option, + pub content_hash: Option, + pub technologies: Vec, + pub is_truncated: Option, + pub file_path: Option, + pub last_modified: Option, + pub git_branch: Option, + pub external_id: Option, + pub platform_url: Option, + /// Web-provenance: variant `Some` wins over metadata fallback. + pub seed_url: Option, + pub search_query: Option, + pub crawl_depth: Option, + /// Format variant — used by `build_point_payload` to apply non-web fields + /// (Email, Feed, Subtitle, Presentation). Stored here so `build_point_payload` + /// doesn't need to receive `&IngestionProvenance` separately. + pub format: FormatProvenance, +} + +impl FileMetadata { + /// Build `FileMetadata` once per document. The merge rule for web-provenance + /// fields is: `FormatProvenance::Web { Some(x) }` takes precedence over + /// `result.metadata.*`; `None` in the variant falls back to `metadata.*`. + /// Non-web variants leave those fields at the metadata value (typically None). + pub(crate) fn from_result_and_provenance( + result: &ExtractionResult, + git_branch: Option, + provenance: &IngestionProvenance, + ) -> Self { + let (seed_url, search_query, crawl_depth) = match &provenance.format { + FormatProvenance::Web { + seed_url, + search_query, + crawl_depth, + } => ( + seed_url + .clone() + .or_else(|| result.metadata.seed_url.clone()), + search_query + .clone() + .or_else(|| result.metadata.search_query.clone()), + crawl_depth.or(result.metadata.crawl_depth), + ), + _ => ( + result.metadata.seed_url.clone(), + result.metadata.search_query.clone(), + result.metadata.crawl_depth, + ), + }; + + Self { + title: result.metadata.title.clone(), + author: result.metadata.author.clone(), + published_date: result.metadata.published_date.clone(), + language: result.metadata.language.clone(), + source_type: result.metadata.source_type.clone(), + content_hash: result.metadata.content_hash.clone(), + technologies: result.metadata.technologies.clone(), + is_truncated: result.metadata.is_truncated, + file_path: result.metadata.file_path.clone(), + last_modified: result.metadata.last_modified.clone(), + git_branch, + external_id: provenance.external_id.clone(), + platform_url: provenance.platform_url.clone(), + seed_url, + search_query, + crawl_depth, + format: provenance.format.clone(), + } + } +} + pub(crate) async fn parse_file(path: &Path, bytes: Vec) -> Result { let ext = path .extension() @@ -87,7 +219,9 @@ pub(crate) async fn parse_file(path: &Path, bytes: Vec) -> Result Ok(parse_plain_text_file(bytes, file_url, title)), "yaml" | "yml" | "toml" => Ok(parse_plain_text_file(bytes, file_url, title)), "log" => Ok(parse_log_file(bytes, file_url, title)), - "html" | "htm" => parse_html_file(bytes, file_url).await, + "html" | "htm" => { + spawn_blocking_parse("HTML", move || parse_html_file(bytes, file_url)).await + } "ipynb" => { spawn_blocking_parse("ipynb", move || parse_ipynb_file(&bytes, file_url, title)).await } @@ -111,7 +245,7 @@ pub(crate) async fn parse_file(path: &Path, bytes: Vec) -> Result Ok(parse_jsonl_file(bytes, file_url, title)), - "xml" | "opml" => Ok(parse_xml_file(bytes, file_url, title)), + "xml" | "opml" => parse_xml_file(bytes, file_url, title), "rss" | "atom" => parse_feed_file(bytes, file_url, title), "eml" => parse_email_file(&bytes, file_url, title), "vtt" | "srt" => Ok(parse_subtitle_file(bytes, file_url, title)), @@ -174,116 +308,56 @@ pub(crate) fn extract_ingestion_provenance(value: &serde_json::Value) -> Ingesti } pub(crate) fn build_point_payload( - chunk: &crate::types::Chunk, - result: &ExtractionResult, - git_branch: Option, - provenance: &IngestionProvenance, + chunk: Chunk, + file_meta: &FileMetadata, url: &str, file_hash: Option<&str>, ) -> PointPayload { - // Default values for every format-specific field. Only the fields - // belonging to the active variant are overridden below; the rest stay - // at their type default (None / empty Vec), matching the old flat - // struct's behaviour where inactive fields were simply not set. - let mut seed_url: Option = None; - let mut search_query: Option = None; - let mut crawl_depth: Option = None; - let mut email_to: Vec = Vec::new(); - let mut email_message_id: Option = None; - let mut email_thread_id: Option = None; - let mut email_has_attachments: Option = None; - let mut feed_url: Option = None; - let mut feed_item_id: Option = None; - let mut pptx_slide_count: Option = None; - let mut pptx_has_notes: Option = None; - let mut subtitle_start_s: Option = None; - let mut subtitle_end_s: Option = None; - let mut subtitle_source_file: Option = None; - - match &provenance.format { - FormatProvenance::Web { - seed_url: s, - search_query: q, - crawl_depth: d, - } => { - seed_url = s.clone(); - search_query = q.clone(); - crawl_depth = *d; - } - FormatProvenance::Email { - to, - message_id, - thread_id, - has_attachments, - } => { - email_to = to.clone(); - email_message_id = message_id.clone(); - email_thread_id = thread_id.clone(); - email_has_attachments = *has_attachments; - } - FormatProvenance::Feed { - feed_url: f, - item_id, - } => { - feed_url = f.clone(); - feed_item_id = item_id.clone(); - } - FormatProvenance::Subtitle { - start_s, - end_s, - source_file, - } => { - subtitle_start_s = *start_s; - subtitle_end_s = *end_s; - subtitle_source_file = source_file.clone(); - } - FormatProvenance::Presentation { - slide_count, - has_notes, - } => { - pptx_slide_count = *slide_count; - pptx_has_notes = *has_notes; - } - FormatProvenance::Generic => {} - } - - PointPayload { - text: chunk.text.clone(), - url: url.to_string(), - domain: chunk.domain.clone(), + let mut payload = PointPayload { + // Per-chunk fields — moved out of chunk (no clone for text/domain/section_header). + text: chunk.text, + domain: chunk.domain, chunk_index: chunk.chunk_index, total_chunks: chunk.total_chunks, token_estimate: chunk.token_estimate, - title: result.metadata.title.clone(), - author: result.metadata.author.clone(), - published_date: result.metadata.published_date.clone(), - language: result.metadata.language.clone(), - source_type: result.metadata.source_type.clone(), - content_hash: result.metadata.content_hash.clone(), - technologies: result.metadata.technologies.clone(), - is_truncated: result.metadata.is_truncated, - file_path: result.metadata.file_path.clone(), - last_modified: result.metadata.last_modified.clone(), - git_branch, - external_id: provenance.external_id.clone(), - platform_url: provenance.platform_url.clone(), - seed_url: seed_url.or_else(|| result.metadata.seed_url.clone()), - search_query: search_query.or_else(|| result.metadata.search_query.clone()), - crawl_depth: crawl_depth.or(result.metadata.crawl_depth), - email_to, - email_message_id, - email_thread_id, - email_has_attachments, - feed_url, - feed_item_id, - pptx_slide_count, - pptx_has_notes, - subtitle_start_s, - subtitle_end_s, - subtitle_source_file, - section_header: chunk.section_header.clone(), + section_header: chunk.section_header, + // Per-file fields — cloned once from FileMetadata (built once per document). + url: url.to_string(), + title: file_meta.title.clone(), + author: file_meta.author.clone(), + published_date: file_meta.published_date.clone(), + language: file_meta.language.clone(), + source_type: file_meta.source_type.clone(), + content_hash: file_meta.content_hash.clone(), + technologies: file_meta.technologies.clone(), + is_truncated: file_meta.is_truncated, + file_path: file_meta.file_path.clone(), + last_modified: file_meta.last_modified.clone(), + git_branch: file_meta.git_branch.clone(), + external_id: file_meta.external_id.clone(), + platform_url: file_meta.platform_url.clone(), + seed_url: file_meta.seed_url.clone(), + search_query: file_meta.search_query.clone(), + crawl_depth: file_meta.crawl_depth, + // Format-variant fields default to empty/None; apply() fills them in. + email_to: Vec::new(), + email_message_id: None, + email_thread_id: None, + email_has_attachments: None, + feed_url: None, + feed_item_id: None, + pptx_slide_count: None, + pptx_has_notes: None, + subtitle_start_s: None, + subtitle_end_s: None, + subtitle_source_file: None, file_hash: file_hash.map(str::to_owned), - } + }; + + // apply() now only handles non-web variants (Email, Feed, Subtitle, Presentation). + // Web-provenance fields were already resolved into FileMetadata once per document. + file_meta.format.apply(&mut payload); + payload } pub(crate) fn make_text_result( diff --git a/crates/noxa-rag/src/pipeline/parse/rich.rs b/crates/noxa-rag/src/pipeline/parse/rich.rs index aec10a4..4200c36 100644 --- a/crates/noxa-rag/src/pipeline/parse/rich.rs +++ b/crates/noxa-rag/src/pipeline/parse/rich.rs @@ -2,6 +2,7 @@ use crate::error::RagError; use super::{ FormatProvenance, IngestionProvenance, ParsedFile, extract_xml_text, make_text_result, + text::contains_xml_entity_expansion_risk, }; pub(crate) fn parse_feed_file( @@ -9,6 +10,12 @@ pub(crate) fn parse_feed_file( file_url: String, title: String, ) -> Result { + if contains_xml_entity_expansion_risk(&bytes) { + return Err(RagError::Parse( + "XML entity expansion risk detected: file contains DOCTYPE/ENTITY declarations" + .to_string(), + )); + } let content = String::from_utf8_lossy(&bytes).into_owned(); let (extraction, provenance) = parse_feed_text(&content, file_url, title)?; Ok(ParsedFile { @@ -33,7 +40,7 @@ pub(crate) fn parse_subtitle_file(bytes: Vec, file_url: String, title: Strin let content = String::from_utf8_lossy(&bytes).into_owned(); let text = strip_subtitle_timestamps(&content); let provenance = subtitle_provenance(&content); - let word_count = text.split_whitespace().count(); + let word_count = crate::chunker::word_count(&text); ParsedFile { extraction: make_text_result( text.clone(), @@ -94,7 +101,7 @@ fn parse_email_text( .params .contains_key("filename") }); - let word_count = body.split_whitespace().count(); + let word_count = crate::chunker::word_count(&body); let mut extraction = make_text_result(body.clone(), body, file_url, subject, "email", word_count); @@ -188,7 +195,7 @@ fn parse_feed_text( parts.push(extract_xml_text(content).unwrap_or_else(|_| content.to_string())); } let text = parts.join("\n\n"); - let word_count = text.split_whitespace().count(); + let word_count = crate::chunker::word_count(&text); let mut extraction = make_text_result( text.clone(), diff --git a/crates/noxa-rag/src/pipeline/parse/tests.rs b/crates/noxa-rag/src/pipeline/parse/tests.rs index 6dbe019..5c64777 100644 --- a/crates/noxa-rag/src/pipeline/parse/tests.rs +++ b/crates/noxa-rag/src/pipeline/parse/tests.rs @@ -1,7 +1,7 @@ use serde_json::json; use std::fs; -use super::{FormatProvenance, IngestionProvenance, build_point_payload, parse_file}; +use super::{FileMetadata, FormatProvenance, IngestionProvenance, build_point_payload, parse_file}; #[tokio::test] async fn parse_file_json_recovers_provenance_fields() { @@ -126,10 +126,11 @@ async fn parse_file_json_keeps_crawler_provenance_in_point_payload() { .metadata .url .as_deref() - .expect("parser should set file url"); + .expect("parser should set file url") + .to_string(); let chunk = crate::types::Chunk { text: parsed.extraction.content.markdown.clone(), - source_url: url.to_string(), + source_url: url.clone(), domain: "example.com".to_string(), chunk_index: 0, total_chunks: 1, @@ -138,14 +139,9 @@ async fn parse_file_json_keeps_crawler_provenance_in_point_payload() { section_header: None, }; - let payload = build_point_payload( - &chunk, - &parsed.extraction, - None, - &parsed.provenance, - url, - None, - ); + let file_meta = + FileMetadata::from_result_and_provenance(&parsed.extraction, None, &parsed.provenance); + let payload = build_point_payload(chunk, &file_meta, &url, None); let json = serde_json::to_value(&payload).expect("serialize payload"); assert_eq!( @@ -210,12 +206,15 @@ fn sample_extraction_with_metadata() -> noxa_core::ExtractionResult { } } +fn sample_file_metadata(provenance: &IngestionProvenance) -> FileMetadata { + FileMetadata::from_result_and_provenance(&sample_extraction_with_metadata(), None, provenance) +} + /// Web variant: external_id/platform_url at the top level, plus seed_url /// and friends falling back to metadata when the variant fields are None. #[test] fn build_point_payload_serializes_web_variant() { let chunk = sample_chunk(); - let extraction = sample_extraction_with_metadata(); let provenance = IngestionProvenance { external_id: Some("linkding:42".to_string()), platform_url: Some("https://platform.example/items/42".to_string()), @@ -225,15 +224,10 @@ fn build_point_payload_serializes_web_variant() { crawl_depth: None, }, }; + let url = chunk.source_url.clone(); + let file_meta = sample_file_metadata(&provenance); - let payload = build_point_payload( - &chunk, - &extraction, - None, - &provenance, - &chunk.source_url, - None, - ); + let payload = build_point_payload(chunk, &file_meta, &url, None); let json = serde_json::to_value(&payload).expect("serialize payload"); assert_eq!( @@ -266,7 +260,6 @@ fn build_point_payload_serializes_web_variant() { #[test] fn build_point_payload_serializes_email_variant() { let chunk = sample_chunk(); - let extraction = sample_extraction_with_metadata(); let provenance = IngestionProvenance { external_id: Some("msg@example.com".to_string()), platform_url: None, @@ -277,15 +270,10 @@ fn build_point_payload_serializes_email_variant() { has_attachments: Some(true), }, }; + let url = chunk.source_url.clone(); + let file_meta = sample_file_metadata(&provenance); - let payload = build_point_payload( - &chunk, - &extraction, - None, - &provenance, - &chunk.source_url, - None, - ); + let payload = build_point_payload(chunk, &file_meta, &url, None); let json = serde_json::to_value(&payload).expect("serialize payload"); assert_eq!( @@ -312,7 +300,6 @@ fn build_point_payload_serializes_email_variant() { #[test] fn build_point_payload_serializes_feed_variant() { let chunk = sample_chunk(); - let extraction = sample_extraction_with_metadata(); let provenance = IngestionProvenance { external_id: Some("entry-1".to_string()), platform_url: None, @@ -321,15 +308,10 @@ fn build_point_payload_serializes_feed_variant() { item_id: Some("entry-1".to_string()), }, }; + let url = chunk.source_url.clone(); + let file_meta = sample_file_metadata(&provenance); - let payload = build_point_payload( - &chunk, - &extraction, - None, - &provenance, - &chunk.source_url, - None, - ); + let payload = build_point_payload(chunk, &file_meta, &url, None); let json = serde_json::to_value(&payload).expect("serialize payload"); assert_eq!( @@ -346,7 +328,6 @@ fn build_point_payload_serializes_feed_variant() { #[test] fn build_point_payload_serializes_presentation_variant() { let chunk = sample_chunk(); - let extraction = sample_extraction_with_metadata(); let provenance = IngestionProvenance { external_id: None, platform_url: None, @@ -355,15 +336,10 @@ fn build_point_payload_serializes_presentation_variant() { has_notes: Some(true), }, }; + let url = chunk.source_url.clone(); + let file_meta = sample_file_metadata(&provenance); - let payload = build_point_payload( - &chunk, - &extraction, - None, - &provenance, - &chunk.source_url, - None, - ); + let payload = build_point_payload(chunk, &file_meta, &url, None); let json = serde_json::to_value(&payload).expect("serialize payload"); assert_eq!( @@ -380,7 +356,6 @@ fn build_point_payload_serializes_presentation_variant() { #[test] fn build_point_payload_serializes_subtitle_variant() { let chunk = sample_chunk(); - let extraction = sample_extraction_with_metadata(); let provenance = IngestionProvenance { external_id: None, platform_url: None, @@ -390,15 +365,10 @@ fn build_point_payload_serializes_subtitle_variant() { source_file: Some("demo.mp4".to_string()), }, }; + let url = chunk.source_url.clone(); + let file_meta = sample_file_metadata(&provenance); - let payload = build_point_payload( - &chunk, - &extraction, - None, - &provenance, - &chunk.source_url, - None, - ); + let payload = build_point_payload(chunk, &file_meta, &url, None); let json = serde_json::to_value(&payload).expect("serialize payload"); assert_eq!( @@ -414,3 +384,53 @@ fn build_point_payload_serializes_subtitle_variant() { Some("demo.mp4") ); } + +/// Entity expansion (billion-laughs) in XML should be rejected before parsing. +#[tokio::test] +async fn parse_xml_rejects_entity_expansion() { + let tmp = tempfile::tempdir().expect("tempdir"); + let path = tmp.path().join("evil.xml"); + let payload = b"\ + ]>\ + &lol;"; + std::fs::write(&path, payload).expect("write"); + let result = parse_file(&path, payload.to_vec()).await; + assert!( + result.is_err(), + "expected Err for DOCTYPE/ENTITY XML, got Ok" + ); + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("DOCTYPE") || msg.contains("ENTITY") || msg.contains("entity expansion"), + "error message should mention DOCTYPE/ENTITY: {msg}" + ); +} + +/// Entity expansion guard in RSS/Atom feed parser. +#[tokio::test] +async fn parse_feed_rejects_entity_expansion() { + let tmp = tempfile::tempdir().expect("tempdir"); + let path = tmp.path().join("evil.rss"); + let payload = b"\ + ]>\ + &lol;"; + std::fs::write(&path, payload).expect("write"); + let result = parse_file(&path, payload.to_vec()).await; + let msg = result.unwrap_err().to_string(); + assert!( + msg.contains("DOCTYPE") || msg.contains("ENTITY") || msg.contains("entity expansion"), + "expected entity-expansion rejection, got: {msg}" + ); +} + +/// Normal XML without DOCTYPE/ENTITY should still parse successfully. +#[tokio::test] +async fn parse_xml_allows_normal_content() { + let tmp = tempfile::tempdir().expect("tempdir"); + let path = tmp.path().join("normal.xml"); + let payload = + b"hello worldfoo bar"; + std::fs::write(&path, payload).expect("write"); + let result = parse_file(&path, payload.to_vec()).await; + assert!(result.is_ok(), "normal XML should parse without error"); +} diff --git a/crates/noxa-rag/src/pipeline/parse/text.rs b/crates/noxa-rag/src/pipeline/parse/text.rs index 378bdb0..44656a0 100644 --- a/crates/noxa-rag/src/pipeline/parse/text.rs +++ b/crates/noxa-rag/src/pipeline/parse/text.rs @@ -32,7 +32,7 @@ pub(crate) fn parse_ipynb_file( } let text = parts.join("\n\n"); - let word_count = text.split_whitespace().count(); + let word_count = crate::chunker::word_count(&text); Ok(ParsedFile { extraction: make_text_result(text.clone(), text, url, Some(title), "notebook", word_count), provenance: IngestionProvenance::default(), @@ -63,7 +63,7 @@ pub(crate) fn parse_json_file( pub(crate) fn parse_markdown_file(bytes: Vec, file_url: String, title: String) -> ParsedFile { let content = String::from_utf8_lossy(&bytes).into_owned(); - let word_count = content.split_whitespace().count(); + let word_count = crate::chunker::word_count(&content); ParsedFile { extraction: make_text_result( content, @@ -79,7 +79,7 @@ pub(crate) fn parse_markdown_file(bytes: Vec, file_url: String, title: Strin pub(crate) fn parse_plain_text_file(bytes: Vec, file_url: String, title: String) -> ParsedFile { let content = String::from_utf8_lossy(&bytes).into_owned(); - let word_count = content.split_whitespace().count(); + let word_count = crate::chunker::word_count(&content); ParsedFile { extraction: make_text_result( content.clone(), @@ -96,7 +96,7 @@ pub(crate) fn parse_plain_text_file(bytes: Vec, file_url: String, title: Str pub(crate) fn parse_log_file(bytes: Vec, file_url: String, title: String) -> ParsedFile { let raw = String::from_utf8_lossy(&bytes).into_owned(); let stripped = strip_ansi_escapes::strip_str(&raw); - let word_count = stripped.split_whitespace().count(); + let word_count = crate::chunker::word_count(&stripped); ParsedFile { extraction: make_text_result( stripped.clone(), @@ -110,24 +110,12 @@ pub(crate) fn parse_log_file(bytes: Vec, file_url: String, title: String) -> } } -pub(crate) async fn parse_html_file( - bytes: Vec, - file_url: String, -) -> Result { +pub(crate) fn parse_html_file(bytes: Vec, file_url: String) -> Result { let html = String::from_utf8_lossy(&bytes).into_owned(); - let url_for_extract = file_url.clone(); - let extraction = tokio::task::spawn_blocking( - move || -> Result { - let mut r = noxa_core::extract(&html, Some(&url_for_extract)) - .map_err(|e| RagError::Parse(format!("HTML extract: {e}")))?; - r.metadata.url = Some(url_for_extract); - r.metadata.source_type = Some("file".to_string()); - Ok(r) - }, - ) - .await - .map_err(|e| RagError::Parse(format!("HTML spawn_blocking: {e}")))??; - + let mut extraction = noxa_core::extract(&html, Some(&file_url)) + .map_err(|e| RagError::Parse(format!("HTML extract: {e}")))?; + extraction.metadata.url = Some(file_url); + extraction.metadata.source_type = Some("file".to_string()); Ok(ParsedFile { extraction, provenance: IngestionProvenance::default(), @@ -146,7 +134,7 @@ pub(crate) fn parse_jsonl_file(bytes: Vec, file_url: String, title: String) }) .collect::>() .join("\n\n"); - let word_count = text.split_whitespace().count(); + let word_count = crate::chunker::word_count(&text); ParsedFile { extraction: make_text_result( text.clone(), @@ -160,14 +148,40 @@ pub(crate) fn parse_jsonl_file(bytes: Vec, file_url: String, title: String) } } -pub(crate) fn parse_xml_file(bytes: Vec, file_url: String, title: String) -> ParsedFile { +/// Scan the first 8 KiB of bytes for DOCTYPE/ENTITY declarations that could +/// trigger exponential entity expansion (the "billion laughs" attack). Returns +/// `true` if such declarations are found. +/// +/// quick-xml 0.37.x does NOT apply DTD expansion limits, so this pre-scan is +/// the primary — and not merely defensive — guard against out-of-memory +/// attacks. Later versions of quick-xml may add limits, but we keep the scan +/// regardless (defense-in-depth). +pub(super) fn contains_xml_entity_expansion_risk(bytes: &[u8]) -> bool { + let header = &bytes[..bytes.len().min(8192)]; + // Scan raw bytes so non-UTF-8 sequences cannot silently suppress the guard. + // `from_utf8().unwrap_or("")` would return "" on any non-UTF-8 byte in the + // window, letting a , + file_url: String, + title: String, +) -> Result { + if contains_xml_entity_expansion_risk(&bytes) { + return Err(RagError::Parse( + "XML entity expansion risk detected: file contains DOCTYPE/ENTITY declarations" + .to_string(), + )); + } let content = String::from_utf8_lossy(&bytes).into_owned(); let text = extract_xml_text(&content).unwrap_or_else(|e| { tracing::warn!(error = %e, "xml text extraction failed; falling back to raw text"); content.clone() }); - let word_count = text.split_whitespace().count(); - ParsedFile { + let word_count = crate::chunker::word_count(&text); + Ok(ParsedFile { extraction: make_text_result( text.clone(), text, @@ -177,7 +191,7 @@ pub(crate) fn parse_xml_file(bytes: Vec, file_url: String, title: String) -> word_count, ), provenance: IngestionProvenance::default(), - } + }) } /// Extract plain text from XML/OPML/RSS/Atom by collecting all text and CDATA nodes. diff --git a/crates/noxa-rag/src/pipeline/process.rs b/crates/noxa-rag/src/pipeline/process.rs index 0ab1725..ddd8146 100644 --- a/crates/noxa-rag/src/pipeline/process.rs +++ b/crates/noxa-rag/src/pipeline/process.rs @@ -1,7 +1,7 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::sync::atomic::Ordering; +use dashmap::DashMap; use tokio::io::AsyncReadExt; use crate::error::RagError; @@ -12,6 +12,29 @@ use super::parse; use super::scan; use super::{DeleteJob, IndexJob, JobStats, WorkerContext}; +/// Walk up from `path`'s parent checking each ancestor directory against `cache`. +/// +/// Returns `Some(branch)` on a cache hit (where the inner `Option` is the stored +/// branch value — `None` means "not in a git repo"), or `None` when no ancestor is cached. +/// +/// O(depth) DashMap lookups; typical repo depth ≤ 5, so this avoids a `spawn_blocking` +/// dispatch for every file after the first in each git repository. +fn find_cached_branch( + path: &Path, + cache: &DashMap>, +) -> Option> { + let mut dir = path.parent()?; + loop { + if let Some(entry) = cache.get(dir) { + return Some(entry.clone()); + } + match dir.parent() { + Some(parent) => dir = parent, + None => return None, + } + } +} + /// Validate that `url` is a permitted indexing target. /// /// - `http`/`https`: delegates to `noxa_store::url_validation::validate_public_http_url`, which @@ -22,29 +45,29 @@ use super::{DeleteJob, IndexJob, JobStats, WorkerContext}; /// - All other schemes: rejected. pub(crate) async fn validate_url_scheme(url: &str) -> Result<(), RagError> { if url.is_empty() { - return Err(RagError::Generic( + return Err(RagError::UrlValidation( "extraction result has no URL".to_string(), )); } - let parsed = - url::Url::parse(url).map_err(|e| RagError::Generic(format!("invalid URL {url:?}: {e}")))?; + let parsed = url::Url::parse(url) + .map_err(|e| RagError::UrlValidation(format!("invalid URL {url:?}: {e}")))?; match parsed.scheme() { "http" | "https" => { noxa_store::url_validation::validate_public_http_url(url) .await - .map_err(|e| RagError::Generic(format!("URL {url:?} blocked: {e}")))?; + .map_err(|e| RagError::UrlValidation(format!("URL {url:?} blocked: {e}")))?; } "file" => match parsed.host_str() { None | Some("") | Some("localhost") => {} Some(host) => { - return Err(RagError::Generic(format!( + return Err(RagError::UrlValidation(format!( "file:// URL with remote host {host:?} is not allowed (only local paths)" ))); } }, other => { - return Err(RagError::Generic(format!( + return Err(RagError::UrlValidation(format!( "URL scheme {other:?} is not allowed (only http/https/file)" ))); } @@ -64,7 +87,7 @@ pub(crate) async fn validate_url_scheme(url: &str) -> Result<(), RagError> { async fn append_failed_job(path: &Path, error: &impl std::fmt::Display, ctx: &WorkerContext) { // Increment the parse-failure counter regardless of whether a log path is // configured — this ensures the heartbeat metric is always accurate. - ctx.counters.parse_failures.fetch_add(1, Ordering::Relaxed); + ctx.counters.record_parse_failure(); let Some(ref log_path) = ctx.config.pipeline.failed_jobs_log else { return; @@ -119,7 +142,7 @@ async fn append_failed_job(path: &Path, error: &impl std::fmt::Display, ctx: &Wo pub(crate) async fn process_job(job: IndexJob, ctx: &WorkerContext) -> Result { let job_start = std::time::Instant::now(); - let t0 = std::time::Instant::now(); + let io_t0 = std::time::Instant::now(); // Canonicalize can fail for benign reasons — most commonly ENOENT when the // file was deleted between the watcher event and job execution. That's a // race, not a backend failure, so we must NOT return `Err` (which would @@ -147,6 +170,7 @@ pub(crate) async fn process_job(job: IndexJob, ctx: &WorkerContext) -> Result Result r, Err(e) => { @@ -182,6 +207,7 @@ pub(crate) async fn process_job(job: IndexJob, ctx: &WorkerContext) -> Result Result::from(mtime).to_rfc3339()); } - let git_branch = { - let path = job.path.clone(); + let git_branch = if let Some(cached) = find_cached_branch(&canonical, &ctx.git_branch_cache) { + // Cache hit — no spawn_blocking needed. `canonical` ancestors were walked + // synchronously in async context (pure in-memory DashMap lookups, no I/O). + cached + } else { + // Cache miss — dispatch to a blocking thread to read .git/HEAD from disk. + // Store the result keyed by the canonical git root so future files in the + // same repo hit the pre-check without spawning. + let canonical_clone = canonical.clone(); let cache = ctx.git_branch_cache.clone(); tokio::task::spawn_blocking(move || { - // Walk up to find the git root first so we can use it as a stable cache key. - if let Some((git_root, branch)) = scan::detect_git_root_and_branch(&path) { + if let Some((git_root, branch)) = scan::detect_git_root_and_branch(&canonical_clone) { + // Canonicalize the git root so subsequent lookups via `canonical` + // ancestors always match, regardless of symlinks or path normalization. + let key = std::fs::canonicalize(&git_root).unwrap_or(git_root); cache - .entry(git_root) + .entry(key) .or_insert_with(|| Some(branch.clone())) .clone() } else { - None + // Cache the miss so we don't re-stat .git for every file outside a repo. + let key = + std::fs::canonicalize(canonical_clone.parent().unwrap_or(&canonical_clone)) + .unwrap_or_else(|_| { + canonical_clone + .parent() + .unwrap_or(&canonical_clone) + .to_path_buf() + }); + cache.entry(key).or_insert(None).clone() } }) .await @@ -228,9 +272,6 @@ pub(crate) async fn process_job(job: IndexJob, ctx: &WorkerContext) -> Result Result Result = chunks - .iter() - .zip(vectors) + .into_iter() + .zip(vectors.into_iter()) .enumerate() .map(|(i, (chunk, vector))| { let id = uuid::Uuid::new_v5( @@ -286,14 +331,7 @@ pub(crate) async fn process_job(job: IndexJob, ctx: &WorkerContext) -> Result Result Result Result<(), RagError> { ); let session_start = Instant::now(); - pipeline - .watch_roots - .set(Arc::new(scan::canonical_watch_roots(&watch_dirs).await?)) - .ok(); + let watch_roots = Arc::new(scan::canonical_watch_roots(&watch_dirs).await?); let (tx, rx) = async_channel::bounded(pipeline.config.pipeline.job_queue_capacity); - let worker_handles = spawn_workers(pipeline, rx); + let worker_handles = spawn_workers(pipeline, rx, Arc::clone(&watch_roots)); let bridge_handle = setup_watcher( &watch_dirs, @@ -105,6 +102,7 @@ pub(crate) async fn run(pipeline: &Pipeline) -> Result<(), RagError> { pipeline.shutdown.clone(), watch_dirs, pipeline.config.pipeline.startup_scan_concurrency, + pipeline.config.pipeline.max_file_size_bytes, ); let heartbeat_handle = spawn_heartbeat( diff --git a/crates/noxa-rag/src/pipeline/scan.rs b/crates/noxa-rag/src/pipeline/scan.rs index 11eb96d..64d6ce6 100644 --- a/crates/noxa-rag/src/pipeline/scan.rs +++ b/crates/noxa-rag/src/pipeline/scan.rs @@ -47,6 +47,19 @@ pub(crate) fn has_indexable_extension(path: &Path) -> bool { /// temp files that are gone by the time we process them. /// /// Deferred (no confirmed use case, would add new crate deps): .epub, .mbox +/// +/// # Symlink safety (TOCTOU note) +/// +/// `path.is_file()` follows symlinks: it returns `true` for a symlink whose target is a +/// regular file. Callers that care about confinement must guard against this with an +/// explicit `path.is_symlink()` check **before** calling `is_indexable`. See +/// [`collect_indexable_paths_recursive`] for the canonical call-site pattern (lines 79–82). +/// +/// This function itself does **not** skip symlinks — that would require two `stat` calls +/// per entry and the function predates the confinement requirement. The authoritative +/// defence-in-depth is the `canonicalize + starts_with(watch_root)` confinement check +/// inside `process_job`, which runs unconditionally regardless of how the path was +/// discovered. pub(crate) fn is_indexable(path: &Path) -> bool { if !path.is_file() { return false; @@ -90,6 +103,10 @@ fn collect_indexable_paths_recursive(path: &Path, found: &mut Vec) { /// Compute the (content_hash, url) key used by the startup delta scan. /// +/// `max_json_bytes` caps JSON deserialization to prevent unbounded heap allocation before +/// pipeline workers start. Pass `pipeline.max_file_size_bytes` from the caller so the +/// limit stays in sync with the per-job guard in `process_job`. +/// /// For `.json` ExtractionResult files: peeks at `metadata.url` and `metadata.content_hash` /// from inside the JSON (fast, avoids full deserialisation of large markdown content). /// Falls back to file:// URL + `mtime::` if the JSON lacks a URL. @@ -98,12 +115,12 @@ fn collect_indexable_paths_recursive(path: &Path, found: &mut Vec) { /// O(stat) dedup key. Collisions cause re-indexing (not data loss), which is acceptable /// versus re-reading and hashing every file on startup. /// -/// Returns `None` when the path is a symlink (confinement-safety: avoid following symlinks -/// out of the watch roots before `process_job` validates confinement), or when the file -/// metadata cannot be read, or when a file:// URL cannot be constructed. +/// Returns `None` when the path is a symlink, metadata cannot be read, a file:// URL +/// cannot be constructed, or a `.json` file exceeds `max_json_bytes` (file is still +/// queued unconditionally and rejected by `process_job` under its own guard). /// /// Must be called inside `spawn_blocking` — this function reads from disk synchronously. -pub(crate) fn startup_scan_key(path: &Path) -> Option<(String, String)> { +pub(crate) fn startup_scan_key(path: &Path, max_json_bytes: u64) -> Option<(String, String)> { // Security: skip symlinks to avoid following links out of watch roots before // confinement validation runs in process_job. Mirrors the symlink skip in // collect_indexable_paths_recursive. @@ -114,6 +131,18 @@ pub(crate) fn startup_scan_key(path: &Path) -> Option<(String, String)> { } if path.extension().and_then(|e| e.to_str()) == Some("json") { + // Guard against unbounded heap allocation: serde_json::from_reader will read the + // full JSON value into memory. Reuse sym_meta (already fetched above) to avoid an + // extra stat syscall — for non-symlinks, symlink_metadata.len() == metadata.len(). + if sym_meta.len() > max_json_bytes { + tracing::debug!( + path = %path.display(), + size = sym_meta.len(), + limit = max_json_bytes, + "startup_scan_key: JSON file exceeds size limit, queuing unconditionally" + ); + return None; + } #[derive(serde::Deserialize)] struct Q { metadata: QM, @@ -453,13 +482,13 @@ mod tests { // Symlink must return None — do not read its target bytes. assert_eq!( - startup_scan_key(&link), + startup_scan_key(&link, 50 * 1024 * 1024), None, "symlinks must be skipped to prevent reading files outside watch roots" ); // Regular file still produces a key. - assert!(startup_scan_key(&target).is_some()); + assert!(startup_scan_key(&target, 50 * 1024 * 1024).is_some()); } #[test] @@ -468,7 +497,7 @@ mod tests { let file = tmp.path().join("doc.md"); fs::write(&file, "hello world").expect("write file"); - let (hash, url) = startup_scan_key(&file).expect("key for md file"); + let (hash, url) = startup_scan_key(&file, 50 * 1024 * 1024).expect("key for md file"); assert!( hash.starts_with("mtime:"), "non-JSON key should be mtime-based, got: {hash}" @@ -483,12 +512,12 @@ mod tests { let tmp = tempfile::tempdir().expect("tempdir"); let file = tmp.path().join("doc.txt"); fs::write(&file, "one").expect("write v1"); - let (k1, _) = startup_scan_key(&file).expect("v1 key"); + let (k1, _) = startup_scan_key(&file, 50 * 1024 * 1024).expect("v1 key"); // Different size ensures the size component changes, so even if mtime // resolution is 1s and the test runs faster than that, the key still differs. fs::write(&file, "two two two").expect("write v2"); - let (k2, _) = startup_scan_key(&file).expect("v2 key"); + let (k2, _) = startup_scan_key(&file, 50 * 1024 * 1024).expect("v2 key"); assert_ne!(k1, k2, "key must change when file size changes"); } @@ -500,7 +529,7 @@ mod tests { let body = r#"{"metadata":{"url":"https://example.com/a","content_hash":"abc123"}}"#; fs::write(&file, body).expect("write json"); - let (hash, url) = startup_scan_key(&file).expect("json key"); + let (hash, url) = startup_scan_key(&file, 50 * 1024 * 1024).expect("json key"); assert_eq!(hash, "abc123"); assert_eq!(url, "https://example.com/a"); } @@ -512,8 +541,56 @@ mod tests { // No url inside metadata -> falls through to file:// + mtime:size. fs::write(&file, r#"{"metadata":{"content_hash":"x"}}"#).expect("write json"); - let (hash, url) = startup_scan_key(&file).expect("json fallback key"); + let (hash, url) = startup_scan_key(&file, 50 * 1024 * 1024).expect("json fallback key"); assert!(hash.starts_with("mtime:"), "fallback hash: {hash}"); assert!(url.starts_with("file://"), "fallback url: {url}"); } + + #[test] + fn startup_scan_key_large_json_returns_none() { + let tmp = tempfile::tempdir().expect("tempdir"); + let file = tmp.path().join("big.json"); + + // Use a sparse file (set_len without writing data) to create a >1 KiB + // file instantly. We use a small limit (1024) to avoid touching the real + // 50 MiB default — the guard logic is identical regardless of the limit value. + const LIMIT: u64 = 1024; + { + let f = std::fs::File::create(&file).expect("create big.json"); + f.set_len(LIMIT + 1).expect("set_len for sparse file"); + } + + assert_eq!( + startup_scan_key(&file, LIMIT), + None, + "JSON file exceeding the limit must return None" + ); + } + + #[test] + fn startup_scan_key_json_at_size_limit_is_processed() { + // A file exactly AT the limit (len == limit, so `len > limit` is false) + // must still be attempted. We set the file to exactly LIMIT bytes. + const LIMIT: u64 = 1024; + let tmp = tempfile::tempdir().expect("tempdir"); + let file = tmp.path().join("doc.json"); + let body = r#"{"metadata":{"url":"https://example.com/b","content_hash":"def456"}}"#; + fs::write(&file, body).expect("write json"); + + // Extend the file to exactly LIMIT bytes with null padding (sparse). + { + let f = std::fs::OpenOptions::new() + .write(true) + .open(&file) + .expect("open"); + f.set_len(LIMIT).expect("set_len"); + } + + // The guard is `len > limit`, so len == limit must NOT return None. + // JSON parsing will fail (trailing nulls), so we expect Some with a mtime key. + assert!( + startup_scan_key(&file, LIMIT).is_some(), + "file at exactly the limit must not be rejected by the size guard" + ); + } } diff --git a/crates/noxa-rag/src/pipeline/startup_scan.rs b/crates/noxa-rag/src/pipeline/startup_scan.rs index 181a905..33b2da4 100644 --- a/crates/noxa-rag/src/pipeline/startup_scan.rs +++ b/crates/noxa-rag/src/pipeline/startup_scan.rs @@ -10,12 +10,21 @@ use crate::store::{DynVectorStore, HashExistsResult}; use super::scan; use super::{IndexJob, PipelineJob}; +/// Number of paths batched into a single `spawn_blocking` call during startup scan. +/// +/// With ~few µs per `startup_scan_key` call, a batch of 256 completes in the low-ms range, +/// keeping cancellation latency bounded while amortizing per-task scheduler overhead across +/// 256 paths instead of 1. At 10 000 files this produces ≈40 batches — comfortably spread +/// across `scan_concurrency` blocking workers. +const STARTUP_SCAN_BATCH: usize = 256; + pub(super) fn spawn_startup_scan( tx: async_channel::Sender, store: DynVectorStore, shutdown: tokio_util::sync::CancellationToken, watch_dirs: Vec, scan_concurrency: usize, + max_json_bytes: u64, ) -> JoinHandle<()> { tokio::spawn(async move { let paths = match tokio::task::spawn_blocking({ @@ -46,8 +55,8 @@ pub(super) fn spawn_startup_scan( let skipped = Arc::new(AtomicUsize::new(0)); let backend_errors = Arc::new(AtomicUsize::new(0)); - stream::iter(paths) - .for_each_concurrent(scan_concurrency, |path| { + stream::iter(paths.chunks(STARTUP_SCAN_BATCH).map(|c| c.to_vec())) + .for_each_concurrent(scan_concurrency, |batch| { let tx = tx.clone(); let store = store.clone(); let shutdown = shutdown.clone(); @@ -60,60 +69,80 @@ pub(super) fn spawn_startup_scan( return; } - let path2 = path.clone(); - let hash_and_url = - tokio::task::spawn_blocking(move || scan::startup_scan_key(&path2)) - .await - .ok() - .flatten(); - - let (hash, url) = match hash_and_url { - Some(t) => t, - None => { - tracing::debug!( - path = %path.display(), - "startup scan: no url/hash, queuing" - ); - let span = - tracing::info_span!("index_job", path = %path.display()); - tokio::select! { - _ = tx.send(PipelineJob::Index(IndexJob { path, span })) => {} - _ = shutdown.cancelled() => {} + // Compute startup_scan_key for every path in the batch inside a single + // blocking task, returning (PathBuf, Option<(hash, url)>) per path. + let keys: Vec<(PathBuf, Option<(String, String)>)> = + match tokio::task::spawn_blocking(move || { + batch + .into_iter() + .map(|p| { + let k = scan::startup_scan_key(&p, max_json_bytes); + (p, k) + }) + .collect() + }) + .await + { + Ok(v) => v, + Err(e) => { + tracing::error!(error = %e, "startup scan: batch spawn_blocking panicked"); + return; } - queued.fetch_add(1, Ordering::Relaxed); + }; + + for (path, hash_and_url) in keys { + if shutdown.is_cancelled() { return; } - }; - - match store.url_with_file_hash_exists_checked(&url, &hash).await { - HashExistsResult::Exists => { - skipped.fetch_add(1, Ordering::Relaxed); - tracing::debug!( - path = %path.display(), - url = %url, - "startup scan: already indexed, skipping" - ); - } - HashExistsResult::NotIndexed => { - let span = - tracing::info_span!("index_job", path = %path.display()); - tokio::select! { - _ = tx.send(PipelineJob::Index(IndexJob { path, span })) => {} - _ = shutdown.cancelled() => {} + + let (hash, url) = match hash_and_url { + Some(t) => t, + None => { + tracing::debug!( + path = %path.display(), + "startup scan: no url/hash, queuing" + ); + let span = + tracing::info_span!("index_job", path = %path.display()); + tokio::select! { + _ = tx.send(PipelineJob::Index(IndexJob { path, span })) => {} + _ = shutdown.cancelled() => {} + } + queued.fetch_add(1, Ordering::Relaxed); + continue; + } + }; + + match store.url_with_file_hash_exists_checked(&url, &hash).await { + HashExistsResult::Exists => { + skipped.fetch_add(1, Ordering::Relaxed); + tracing::debug!( + path = %path.display(), + url = %url, + "startup scan: already indexed, skipping" + ); + } + HashExistsResult::NotIndexed => { + let span = + tracing::info_span!("index_job", path = %path.display()); + tokio::select! { + _ = tx.send(PipelineJob::Index(IndexJob { path, span })) => {} + _ = shutdown.cancelled() => {} + } + queued.fetch_add(1, Ordering::Relaxed); + } + HashExistsResult::BackendError(ref msg) => { + // Do NOT re-queue on backend failure — a degraded Qdrant endpoint + // must not trigger a full reindex storm. The file will be + // re-evaluated on next startup once the backend recovers. + backend_errors.fetch_add(1, Ordering::Relaxed); + tracing::warn!( + path = %path.display(), + url = %url, + error = %msg, + "startup scan: backend error during delta check — skipping requeue to avoid reindex storm" + ); } - queued.fetch_add(1, Ordering::Relaxed); - } - HashExistsResult::BackendError(ref msg) => { - // Do NOT re-queue on backend failure — a degraded Qdrant endpoint - // must not trigger a full reindex storm. The file will be - // re-evaluated on next startup once the backend recovers. - backend_errors.fetch_add(1, Ordering::Relaxed); - tracing::warn!( - path = %path.display(), - url = %url, - error = %msg, - "startup scan: backend error during delta check — skipping requeue to avoid reindex storm" - ); } } } @@ -240,7 +269,14 @@ mod tests { let shutdown = CancellationToken::new(); let (tx, rx) = async_channel::bounded::(256); - let handle = spawn_startup_scan(tx.clone(), store, shutdown.clone(), vec![watch_dir], 16); + let handle = spawn_startup_scan( + tx.clone(), + store, + shutdown.clone(), + vec![watch_dir], + 16, + 50 * 1024 * 1024, + ); handle.await.expect("startup scan panicked"); drop(tx); diff --git a/crates/noxa-rag/src/pipeline/worker.rs b/crates/noxa-rag/src/pipeline/worker.rs index 8723e04..edd0285 100644 --- a/crates/noxa-rag/src/pipeline/worker.rs +++ b/crates/noxa-rag/src/pipeline/worker.rs @@ -1,3 +1,6 @@ +use std::path::PathBuf; +use std::sync::Arc; + use tokio::task::JoinHandle; use tracing::Instrument; @@ -7,8 +10,9 @@ use super::{Pipeline, PipelineJob, WorkerContext}; pub(super) fn spawn_workers( pipeline: &Pipeline, rx: async_channel::Receiver, + watch_roots: Arc>, ) -> Vec> { - let ctx = WorkerContext::from_pipeline(pipeline); + let ctx = WorkerContext::from_pipeline(pipeline, watch_roots); let mut handles = Vec::with_capacity(pipeline.config.pipeline.embed_concurrency); for worker_id in 0..pipeline.config.pipeline.embed_concurrency { diff --git a/crates/noxa-rag/src/store/qdrant/vector_store.rs b/crates/noxa-rag/src/store/qdrant/vector_store.rs index cb32a0c..a335621 100644 --- a/crates/noxa-rag/src/store/qdrant/vector_store.rs +++ b/crates/noxa-rag/src/store/qdrant/vector_store.rs @@ -38,15 +38,22 @@ async fn check_response( #[async_trait] impl VectorStore for QdrantStore { - /// PUT /collections/{name}/points?wait=false — idempotent with deterministic - /// UUID v5 point IDs, so we do not block on WAL+index flush. This saves the - /// ~30ms RTT per upsert that `wait=true` imposes. Delete operations still - /// use `wait=true` because stale-chunk cleanup must observably complete - /// before the URL-lock is released. + /// PUT /collections/{name}/points?wait=true — we use wait=true here even + /// though the deterministic UUID v5 IDs make the upsert idempotent, because + /// `delete_stale_by_url` runs immediately after with `must_not: has_id + /// [new_ids]`. With wait=false, Qdrant returns `Acknowledged` once the + /// operation is written to WAL but *before* it is applied to segments. + /// The delete filter evaluates against current segment state, so new point + /// IDs that are WAL-only are not yet visible — meaning the `has_id` guard + /// could fail to protect them if they were somehow targeted. Qdrant's docs + /// do not guarantee that subsequent requests see wait=false WAL entries + /// before segment flush (no per-connection ordering guarantee is documented). + /// wait=true ensures the upserted points are committed to segments before + /// the delete filter runs, closing the race at the cost of ~30ms RTT. async fn upsert(&self, points: Vec) -> Result { let n = points.len(); let url = format!( - "{}/collections/{}/points?wait=false", + "{}/collections/{}/points?wait=true", self.base_url, self.collection ); diff --git a/crates/noxa-rag/src/types.rs b/crates/noxa-rag/src/types.rs index 92d7611..7c9493d 100644 --- a/crates/noxa-rag/src/types.rs +++ b/crates/noxa-rag/src/types.rs @@ -71,14 +71,14 @@ pub struct PointPayload { /// Git branch detected from .git/HEAD walk-up (file:// sources only). #[serde(default, skip_serializing_if = "Option::is_none")] pub git_branch: Option, - // ── Ingestion-provenance fields from IngestionContext ─────────────────── + // ── Ingestion-provenance fields ───────────────────────────────────────── /// Opaque platform id: 'linkding:42', 'memos:7' (Wave 3+). #[serde(default, skip_serializing_if = "Option::is_none")] pub external_id: Option, /// Native platform UI URL (Wave 3+). #[serde(default, skip_serializing_if = "Option::is_none")] pub platform_url: Option, - // ── Web-provenance fields from IngestionContext ───────────────────────── + // ── Web-provenance fields ──────────────────────────────────────────────── #[serde(default, skip_serializing_if = "Option::is_none")] pub seed_url: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -211,63 +211,6 @@ pub struct SearchMetadataFilter { pub hnsw_ef: Option, } -/// RAG-pipeline provenance carried alongside ExtractionResult through ingestion. -/// -/// These fields have no meaning to noxa-fetch, noxa-mcp, or WASM consumers — they -/// live here in noxa-rag, not in noxa-core. At upsert time both Metadata and -/// IngestionContext are serialized into PointPayload. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct IngestionContext { - /// Matches Metadata.source_type: 'web' | 'file' | 'mcp' | 'notebook' | 'email' - pub source_type: String, - /// SHA-256 hex digest — duplicated from Metadata.content_hash for fast access. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub content_hash: Option, - // Platform fields — populated when MCP sources land in Wave 3. - /// Opaque platform identifier: 'linkding:42', 'memos:7', 'paperless:15'. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub external_id: Option, - /// Native UI URL (not the canonical content URL). - #[serde(default, skip_serializing_if = "Option::is_none")] - pub platform_url: Option, - // AI session fields — populated when AI session sources land. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub session_tool: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub conversation_id: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub model_id: Option, - // Web provenance — populated by noxa-fetch. - #[serde(default, skip_serializing_if = "Option::is_none")] - pub seed_url: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub search_query: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub crawl_depth: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub email_to: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub email_message_id: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub email_thread_id: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub email_has_attachments: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub feed_url: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub feed_item_id: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub pptx_slide_count: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub pptx_has_notes: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub subtitle_start_s: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub subtitle_end_s: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub subtitle_source_file: Option, -} - #[cfg(test)] mod tests { use super::SearchResult;