Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0f588de
chore(noxa-bd4): delete dead IngestionContext struct from types.rs
jmagar Apr 26, 2026
59a195c
refactor(noxa-4jt): split mcp_bridge.rs into per-platform module dire…
jmagar Apr 26, 2026
cf7eed2
fix(noxa-bkq): change upsert to wait=true to ensure ordering with del…
jmagar Apr 26, 2026
1b46471
fix(noxa-5gf): add measured decompression cap to ODT/PPTX in parse_of…
jmagar Apr 26, 2026
d11219b
fix(noxa-gs8): replace double tokenization with word_count approximat…
jmagar Apr 26, 2026
c9c9b15
refactor(noxa-mqm): add record_parse_failure() to SessionCounters, re…
jmagar Apr 26, 2026
c80b2ec
refactor(noxa-26r): wrap RagConfig in Arc for WorkerContext
jmagar Apr 26, 2026
0d99532
fix(noxa-3b8): add 50 MiB size guard to startup_scan_key before JSON …
jmagar Apr 26, 2026
8779914
perf(noxa-c28): eliminate redundant chunk text clone in TeiProvider::…
jmagar Apr 26, 2026
5b59687
docs(noxa-dkl): document is_indexable symlink TOCTOU window and defen…
jmagar Apr 26, 2026
bfa3b53
refactor(noxa-qgq): remove OnceLock<watch_roots> from Pipeline, pass …
jmagar Apr 26, 2026
9b220ca
fix(noxa-byr): omit TEI response body from retry logs when auth_token…
jmagar Apr 26, 2026
debc760
perf(noxa-5tl): batch startup_scan spawn_blocking calls to reduce per…
jmagar Apr 26, 2026
35db1ad
refactor(noxa-ngd): normalize parse_html_file to sync fn via spawn_bl…
jmagar Apr 26, 2026
c390dd3
fix(noxa-u90): split parse_ms timer into io_ms + parse_ms in process_job
jmagar Apr 26, 2026
3a9479b
fix(noxa-qkg): guard XML/OPML/feed parsers against entity expansion a…
jmagar Apr 26, 2026
4e71049
refactor(noxa-3g7): add UrlValidation/WorkerPanic RagError variants, …
jmagar Apr 26, 2026
2a39b7d
refactor(noxa-udb): move FormatProvenance match into apply() method, …
jmagar Apr 26, 2026
8d2cc7b
perf(noxa-rso): check git_branch_cache before spawning blocking task …
jmagar Apr 26, 2026
8c09c8a
perf(noxa-346): factor per-file metadata into FileMetadata struct, el…
jmagar Apr 26, 2026
bd4243a
refactor: simplify — shared word_count, remove redundant comments, el…
jmagar Apr 27, 2026
47d420b
fix: address PR #14 review comments
jmagar Apr 27, 2026
b8bab43
fix: address PR #14 round-2 review comments (coderabbitai)
jmagar Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 111 additions & 10 deletions crates/noxa-rag/src/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::config::ChunkerConfig;
use crate::types::Chunk;

/// Count whitespace-separated words in a string.
fn word_count(s: &str) -> usize {
pub(crate) fn word_count(s: &str) -> usize {
s.split_whitespace().count()
}

Expand Down Expand Up @@ -42,21 +42,19 @@ fn extract_domain(url: &str) -> String {
.unwrap_or_default()
}

/// Approximate token count — use the tokenizer when possible, fall back to word count.
fn token_estimate(text: &str, tokenizer: &Tokenizer) -> usize {
tokenizer
.encode(text, false)
.map(|enc| enc.len())
.unwrap_or_else(|_| text.split_whitespace().count())
}

/// Chunk an `ExtractionResult` into a `Vec<Chunk>`.
///
/// - Uses `content.markdown` if non-empty, otherwise `content.plain_text`.
/// - Empty content (both empty) → `Vec::new()`.
/// - Uses `ChunkConfig::with_overlap()` for sliding-window overlap (built into text-splitter ≥0.25).
/// - Filters chunks below `config.min_words`.
/// - Caps output at `config.max_chunks_per_page`.
///
/// # Token estimate
/// The `Chunk::token_estimate` field is populated with a word-count approximation.
/// The tokenizer is still used exclusively by the splitter for accurate boundary placement
/// via `ChunkConfig::with_sizer`; re-encoding every emitted chunk would halve throughput
/// on the `spawn_blocking` hot path for no practical gain (the field is diagnostic only).
pub fn chunk(
result: &ExtractionResult,
config: &ChunkerConfig,
Expand Down Expand Up @@ -112,7 +110,7 @@ pub fn chunk(
.into_iter()
.enumerate()
.map(|(chunk_index, (char_offset, text))| {
let t_est = token_estimate(&text, tokenizer);
let t_est = word_count(&text);
let section_header = nearest_heading(&headings, char_offset).map(|s| s.to_string());
Chunk {
text,
Expand All @@ -132,6 +130,57 @@ pub fn chunk(
mod tests {
use super::*;

fn make_extraction_result(markdown: &str) -> ExtractionResult {
crate::pipeline::parse::make_text_result(
markdown.to_string(),
String::new(),
"https://example.com/test".to_string(),
None,
"test",
crate::chunker::word_count(markdown),
)
}

/// Build a minimal whitespace-pretokenized WordLevel tokenizer suitable for
/// unit tests. Every word becomes a distinct token (unk token used for anything
/// not in the small vocab), which is sufficient for splitter boundary logic.
fn make_test_tokenizer() -> Tokenizer {
// A minimal valid tokenizer JSON: WordLevel model with whitespace pre-tokenizer.
// Using from_str avoids the ahash::AHashMap type constraint on WordLevelBuilder::vocab
// and the TokenizerImpl→Tokenizer conversion from TokenizerBuilder.
let json = serde_json::json!({
"version": "1.0",
"truncation": null,
"padding": null,
"added_tokens": [],
"normalizer": null,
"pre_tokenizer": {
"type": "Whitespace"
},
"post_processor": null,
"decoder": null,
"model": {
"type": "WordLevel",
"vocab": {
"[UNK]": 0,
"the": 1,
"and": 2,
"of": 3,
"a": 4,
"to": 5,
"in": 6,
"is": 7,
"it": 8,
"that": 9
},
"unk_token": "[UNK]"
}
});
json.to_string()
.parse::<Tokenizer>()
.expect("test tokenizer from JSON")
}

#[test]
fn domain_extraction() {
assert_eq!(
Expand All @@ -148,4 +197,56 @@ mod tests {
assert_eq!(word_count(" "), 0);
assert_eq!(word_count(""), 0);
}

/// Verify that no double-tokenization occurs: token_estimate is populated with
/// word_count (not from the tokenizer), and chunks are produced for non-trivial input.
#[test]
fn chunk_token_estimate_uses_word_count_not_tokenizer() {
let tokenizer = make_test_tokenizer();
let config = crate::config::ChunkerConfig {
target_tokens: 50,
overlap_tokens: 0,
min_words: 1,
max_chunks_per_page: 200,
};

// A body of text long enough to produce at least one chunk.
let body = (0..200)
.map(|i| format!("word{i}"))
.collect::<Vec<_>>()
.join(" ");
let result = make_extraction_result(&body);
let chunks = chunk(&result, &config, &tokenizer);

assert!(
!chunks.is_empty(),
"expected at least one chunk for a 200-word body"
);

for c in &chunks {
// token_estimate must be populated (> 0 for any non-empty chunk).
assert!(
c.token_estimate > 0,
"token_estimate must be > 0, got {}",
c.token_estimate
);
// token_estimate == word_count of the chunk text (the word_count approximation).
let expected = word_count(&c.text);
assert_eq!(
c.token_estimate, expected,
"token_estimate should equal word_count for chunk at index {}",
c.chunk_index
);
}
}

/// Confirm empty content returns no chunks.
#[test]
fn chunk_empty_content_returns_empty() {
let tokenizer = make_test_tokenizer();
let config = crate::config::ChunkerConfig::default();
let result = make_extraction_result("");
let chunks = chunk(&result, &config, &tokenizer);
assert!(chunks.is_empty());
}
}
52 changes: 34 additions & 18 deletions crates/noxa-rag/src/embed/tei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,28 @@ impl TeiProvider {
}

if should_retry(status_u16, attempt) {
let body = resp.text().await.unwrap_or_default();
let preview: String = body.chars().take(512).collect();
tracing::warn!(
batch = batch_idx + 1,
attempt = attempt + 1,
max_attempts = MAX_RETRIES + 1,
status = status_u16,
delay_ms,
body = preview,
"TEI retry"
);
if self.auth_token.is_some() {
tracing::warn!(
batch = batch_idx + 1,
attempt = attempt + 1,
max_attempts = MAX_RETRIES + 1,
status = status_u16,
delay_ms,
"TEI retry (body omitted: auth token configured)"
);
} else {
let body = resp.text().await.unwrap_or_default();
let preview: String = body.chars().take(512).collect();
tracing::warn!(
batch = batch_idx + 1,
attempt = attempt + 1,
max_attempts = MAX_RETRIES + 1,
status = status_u16,
delay_ms,
body = preview,
"TEI retry"
);
}
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(2_000);
continue;
Expand Down Expand Up @@ -318,15 +329,20 @@ impl EmbedProvider for TeiProvider {
// Keep EMBED_PIPELINE_DEPTH batches in-flight concurrently so HTTP
// round-trip latency overlaps with GPU compute on the TEI server.
// buffered() preserves batch ordering.
let batches: Vec<(usize, Vec<String>)> = texts
.chunks(BATCH_SIZE)
.enumerate()
.map(|(i, chunk)| (i, chunk.to_vec()))
//
// We stream (batch_idx, start, end) index tuples and slice `texts` inside
// the async block to avoid cloning the text data into owned Vec<String> batches.
let batch_ranges: Vec<(usize, usize, usize)> = (0..total_batches)
.map(|i| {
let start = i * BATCH_SIZE;
let end = (start + BATCH_SIZE).min(texts.len());
(i, start, end)
})
.collect();

let results: Vec<Vec<Vec<f32>>> = futures::stream::iter(batches)
.map(|(batch_idx, batch)| async move {
self.embed_batch_adaptive(&batch, batch_idx, total_batches)
let results: Vec<Vec<Vec<f32>>> = futures::stream::iter(batch_ranges)
.map(|(batch_idx, start, end)| async move {
self.embed_batch_adaptive(&texts[start..end], batch_idx, total_batches)
.await
})
.buffered(EMBED_PIPELINE_DEPTH)
Expand Down
4 changes: 4 additions & 0 deletions crates/noxa-rag/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub enum RagError {
Json(#[from] serde_json::Error),
#[error("parse error: {0}")]
Parse(String),
#[error("url validation failed: {0}")]
UrlValidation(String),
#[error("worker panicked: {0}")]
WorkerPanic(String),
#[error("error: {0}")]
Generic(String),
}
Expand Down
105 changes: 105 additions & 0 deletions crates/noxa-rag/src/mcp_bridge/bytestash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use serde_json::Value;

use crate::RagError;

use super::{
BridgeDocument, McpBridge, McpSource, McporterExecutor, SyncReport, WriteStatus, array_field,
io::{build_extraction, write_bridge_document},
join_base_url, join_non_empty, optional_string, required_base_url, required_string,
string_array,
};
Comment thread
coderabbitai[bot] marked this conversation as resolved.

impl<E> McpBridge<E>
where
E: McporterExecutor,
{
pub(super) async fn sync_bytestash(&self) -> Result<SyncReport, RagError> {
let base_url = required_base_url(&self.config, McpSource::Bytestash)?;
let data = self
.call_data(McpSource::Bytestash, "snippets.list", serde_json::json!({}))
.await?;
let records = if let Some(array) = data.as_array() {
array.iter().collect::<Vec<_>>()
} else {
array_field(&data, "snippets")?
};

let mut report = SyncReport::default();
for record in records {
let document = normalize_bytestash_record(record, base_url)?;
report.fetched += 1;
match write_bridge_document(&self.config.watch_dir, &document).await? {
WriteStatus::Written => report.written += 1,
WriteStatus::Unchanged => report.skipped += 1,
}
}

Ok(report)
}
Comment thread
jmagar marked this conversation as resolved.
}

pub fn normalize_bytestash_record(
record: &Value,
platform_base_url: &str,
) -> Result<BridgeDocument, RagError> {
let id = required_string(record, "id")?;
let title = optional_string(record, "title");
let description = optional_string(record, "description");
let language = optional_string(record, "language");
let fragments = record
.get("fragments")
.and_then(Value::as_array)
.ok_or_else(|| RagError::Parse("bytestash record missing fragments array".to_string()))?;

let mut markdown_parts = Vec::new();
if let Some(value) = title.as_deref() {
markdown_parts.push(format!("# {value}"));
}
if let Some(value) = description.as_deref() {
markdown_parts.push(value.to_string());
}
for fragment in fragments {
let file_name = fragment
.get("fileName")
.or_else(|| fragment.get("file_name"))
.and_then(Value::as_str)
.unwrap_or("snippet");
let code = fragment
.get("code")
.and_then(Value::as_str)
.unwrap_or_default();
markdown_parts.push(format!(
"## {file_name}\n```{}\n{}\n```",
language.clone().unwrap_or_default(),
code
));
}
let plain_text = join_non_empty([
title.clone(),
description.clone(),
Some(
fragments
.iter()
.filter_map(|fragment| fragment.get("code").and_then(Value::as_str))
.collect::<Vec<_>>()
.join("\n\n"),
),
]);
let url = join_base_url(platform_base_url, &format!("/api/snippets/{id}"))?;

Ok(BridgeDocument {
source: McpSource::Bytestash,
external_id: format!("bytestash:{id}"),
platform_url: Some(url.clone()),
extraction: build_extraction(
url,
title,
None,
None,
language,
string_array(record.get("categories")),
markdown_parts.join("\n\n"),
plain_text,
),
})
}
Loading
Loading