diff --git a/frontend/src/EmbeddingProgressContext.jsx b/frontend/src/EmbeddingProgressContext.jsx new file mode 100644 index 00000000000..a6b8a5c3118 --- /dev/null +++ b/frontend/src/EmbeddingProgressContext.jsx @@ -0,0 +1,165 @@ +import { + createContext, + useCallback, + useContext, + useEffect, + useRef, + useState, +} from "react"; +import { fetchEventSource } from "@microsoft/fetch-event-source"; +import { API_BASE } from "@/utils/constants"; +import { baseHeaders, safeJsonParse } from "@/utils/request"; + +const EmbeddingProgressContext = createContext(); + +export function useEmbeddingProgress() { + const ctx = useContext(EmbeddingProgressContext); + if (!ctx) + throw new Error( + "useEmbeddingProgress must be used within EmbeddingProgressProvider" + ); + return ctx; +} + +export function EmbeddingProgressProvider({ children }) { + const [embeddingProgressMap, setEmbeddingProgressMap] = useState({}); + const abortControllersRef = useRef({}); + const cleanupTimeoutsRef = useRef({}); + + useEffect(() => { + return () => { + for (const slug of Object.keys(abortControllersRef.current)) { + abortControllersRef.current[slug]?.abort(); + } + }; + }, []); + + const updateFileStatus = (slug, filename, status) => + setEmbeddingProgressMap((prev) => ({ + ...prev, + [slug]: { ...prev[slug], [filename]: status }, + })); + + function handleMessage(slug, msg, ctrl) { + const data = safeJsonParse(msg.data); + + switch (data.type) { + case "batch_starting": { + const initial = {}; + for (const name of data.filenames || []) { + initial[name] = { status: "pending" }; + } + setEmbeddingProgressMap((prev) => ({ + ...prev, + [slug]: { ...initial, ...prev[slug] }, + })); + break; + } + + case "doc_starting": + updateFileStatus(slug, data.filename, { + status: "embedding", + chunksProcessed: 0, + totalChunks: 0, + }); + break; + + case "chunk_progress": + updateFileStatus(slug, data.filename, { + status: "embedding", + chunksProcessed: data.chunksProcessed, + totalChunks: data.totalChunks, + }); + break; + + case "doc_complete": + updateFileStatus(slug, data.filename, { status: "complete" }); + break; + + case "doc_failed": + updateFileStatus(slug, data.filename, { + status: "failed", + error: data.error || "Embedding failed", + }); + break; + + case "all_complete": + ctrl.abort(); + delete abortControllersRef.current[slug]; + cleanupTimeoutsRef.current[slug] = setTimeout(() => { + setEmbeddingProgressMap((prev) => { + const next = { ...prev }; + delete next[slug]; + return next; + }); + delete cleanupTimeoutsRef.current[slug]; + }, 5000); + break; + } + } + + /** + * Open (or reconnect) an SSE connection for a given workspace slug. + * Updates embeddingProgressMap in real time as events arrive. + */ + const connectSSE = useCallback((slug) => { + if (abortControllersRef.current[slug]) return; + + const ctrl = new AbortController(); + abortControllersRef.current[slug] = ctrl; + + fetchEventSource(`${API_BASE}/workspace/${slug}/embed-progress`, { + method: "GET", + headers: baseHeaders(), + signal: ctrl.signal, + openWhenHidden: true, + onmessage: (msg) => handleMessage(slug, msg, ctrl), + onclose() { + delete abortControllersRef.current[slug]; + }, + onerror() { + delete abortControllersRef.current[slug]; + throw new Error("SSE connection error"); + }, + }).catch(() => { + // SSE is optional — embedding still works without it + }); + }, []); + + const startEmbedding = useCallback( + (slug, filenames) => { + if (abortControllersRef.current[slug]) { + abortControllersRef.current[slug].abort(); + delete abortControllersRef.current[slug]; + } + if (cleanupTimeoutsRef.current[slug]) { + clearTimeout(cleanupTimeoutsRef.current[slug]); + delete cleanupTimeoutsRef.current[slug]; + } + + const initialProgress = {}; + for (const name of filenames) { + initialProgress[name] = { status: "pending" }; + } + setEmbeddingProgressMap((prev) => ({ + ...prev, + [slug]: { ...initialProgress }, + })); + + connectSSE(slug); + }, + [connectSSE] + ); + + return ( + + {children} + + ); +} diff --git a/frontend/src/components/Modals/ManageWorkspace/Documents/WorkspaceDirectory/index.jsx b/frontend/src/components/Modals/ManageWorkspace/Documents/WorkspaceDirectory/index.jsx index 46f0bdf1646..38999e7e428 100644 --- a/frontend/src/components/Modals/ManageWorkspace/Documents/WorkspaceDirectory/index.jsx +++ b/frontend/src/components/Modals/ManageWorkspace/Documents/WorkspaceDirectory/index.jsx @@ -3,7 +3,14 @@ import { dollarFormat } from "@/utils/numbers"; import WorkspaceFileRow from "./WorkspaceFileRow"; import { memo, useEffect, useState } from "react"; import ModalWrapper from "@/components/ModalWrapper"; -import { Eye, PushPin } from "@phosphor-icons/react"; +import { + Eye, + PushPin, + CheckCircle, + XCircle, + CircleNotch, + Clock, +} from "@phosphor-icons/react"; import { SEEN_DOC_PIN_ALERT, SEEN_WATCH_ALERT } from "@/utils/constants"; import paths from "@/utils/paths"; import { Link } from "react-router-dom"; @@ -11,6 +18,7 @@ import Workspace from "@/models/workspace"; import { Tooltip } from "react-tooltip"; import { safeJsonParse } from "@/utils/request"; import { useTranslation } from "react-i18next"; +import { middleTruncate } from "@/utils/directories"; function WorkspaceDirectory({ workspace, @@ -25,6 +33,7 @@ function WorkspaceDirectory({ saveChanges, embeddingCosts, movedItems, + embeddingProgress = null, }) { const { t } = useTranslation(); const [selectedItems, setSelectedItems] = useState({}); @@ -87,7 +96,7 @@ function WorkspaceDirectory({ saveChanges(e); }; - if (loading) { + if (loading || embeddingProgress) { return (
@@ -101,14 +110,31 @@ function WorkspaceDirectory({

Name

-

-

-
- -

- {loadingMessage} +

+ Status

+ + {embeddingProgress ? ( +
+ {Object.entries(embeddingProgress).map( + ([filename, fileStatus]) => ( + + ) + )} +
+ ) : ( +
+ +

+ {loadingMessage} +

+
+ )}
); @@ -467,4 +493,98 @@ function WorkspaceDocumentTooltips() { ); } +/** + * @param {string} filename + */ +const getDisplayName = (filename) => + filename + .split("/") + .pop() + ?.replace(/\.json$/, "") || filename; + +const getStatusStyles = () => ({ + pending: { + icon: ( + + ), + textColor: "", + label: "Queued", + }, + + embedding: { + icon: ( + + ), + textColor: "text-sky-400", + label: "Embedding", + }, + + complete: { + icon: ( + + ), + textColor: "text-green-400", + label: "Complete", + }, + failed: { + icon: , + textColor: "text-red-400", + label: "Failed", + }, +}); +function EmbeddingFileRow({ filename, status: fileStatus }) { + const { status, chunksProcessed = 0, totalChunks = 0 } = fileStatus; + const displayName = getDisplayName(filename); + const statusStyles = getStatusStyles(); + const isEmbedding = status === "embedding" && totalChunks > 0; + const pct = isEmbedding + ? Math.round((chunksProcessed / totalChunks) * 100) + : 0; + + return ( +
+
+ {statusStyles[status].icon || statusStyles.pending.icon} +

+ {middleTruncate(displayName, 45)} +

+
+
+ {isEmbedding ? ( +
+
+
+
+

+ {pct}% +

+
+ ) : ( +

+ {statusStyles[status].label || "Queued"} +

+ )} +
+
+ ); +} + export default memo(WorkspaceDirectory); diff --git a/frontend/src/components/Modals/ManageWorkspace/Documents/index.jsx b/frontend/src/components/Modals/ManageWorkspace/Documents/index.jsx index 98244d51835..c9904e16c4d 100644 --- a/frontend/src/components/Modals/ManageWorkspace/Documents/index.jsx +++ b/frontend/src/components/Modals/ManageWorkspace/Documents/index.jsx @@ -1,10 +1,11 @@ import { ArrowsDownUp } from "@phosphor-icons/react"; -import { useEffect, useState } from "react"; +import { useEffect, useRef, useState } from "react"; import Workspace from "../../../../models/workspace"; import System from "../../../../models/system"; import showToast from "../../../../utils/toast"; import Directory from "./Directory"; import WorkspaceDirectory from "./WorkspaceDirectory"; +import { useEmbeddingProgress } from "@/EmbeddingProgressContext"; // OpenAI Cost per token // ref: https://openai.com/pricing#:~:text=%C2%A0/%201K%20tokens-,Embedding%20models,-Build%20advanced%20search @@ -26,6 +27,28 @@ export default function DocumentSettings({ workspace, systemSettings }) { const [embeddingsCost, setEmbeddingsCost] = useState(0); const [loadingMessage, setLoadingMessage] = useState(""); + const { embeddingProgressMap, startEmbedding, connectSSE } = + useEmbeddingProgress(); + + const embeddingProgress = embeddingProgressMap[workspace.slug] || null; + + // On mount, connect SSE so we catch up on any active embedding job + // via the server's buffered history replay. + useEffect(() => { + connectSSE(workspace.slug); + }, [workspace.slug, connectSSE]); + + // When progress is cleared by the context (all_complete + 5s auto-clear), + // refresh the file lists so the normal view reflects newly embedded docs. + const prevProgressRef = useRef(embeddingProgress); + useEffect(() => { + // Went from non-null → null = progress was just cleared + if (prevProgressRef.current && !embeddingProgress) { + fetchKeys(true); + } + prevProgressRef.current = embeddingProgress; + }, [embeddingProgress]); + async function fetchKeys(refetchWorkspace = false) { setLoading(true); const localFiles = await System.localFiles(); @@ -86,36 +109,41 @@ export default function DocumentSettings({ workspace, systemSettings }) { const updateWorkspace = async (e) => { e.preventDefault(); setLoading(true); - showToast("Updating workspace...", "info", { autoClose: false }); setLoadingMessage("This may take a while for large documents"); - const changesToSend = { - adds: movedItems.map((item) => `${item.folderName}/${item.name}`), - }; + const filenames = movedItems.map( + (item) => `${item.folderName}/${item.name}` + ); + const changesToSend = { adds: filenames }; setSelectedItems({}); setHasChanges(false); setHighlightWorkspace(false); - await Workspace.modifyEmbeddings(workspace.slug, changesToSend) - .then((res) => { - if (!!res.message) { - showToast(`Error: ${res.message}`, "error", { clear: true }); - return; - } - showToast("Workspace updated successfully.", "success", { - clear: true, - }); + + // Fire the embed POST first so the server is already processing the job + // by the time the SSE connection opens. This avoids the server sending + // idle (no active job) before embedding has started. + const embedPromise = Workspace.modifyEmbeddings( + workspace.slug, + changesToSend + ); + startEmbedding(workspace.slug, filenames); + + embedPromise + .then(async () => { + // Refresh file lists after API responds. + // Progress UI is driven by SSE via embeddingProgress context. + await fetchKeys(true); }) .catch((error) => { showToast(`Workspace update failed: ${error}`, "error", { clear: true, }); + setLoading(false); + setLoadingMessage(""); }); setMovedItems([]); - await fetchKeys(true); - setLoading(false); - setLoadingMessage(""); }; const moveSelectedItemsToWorkspace = () => { @@ -224,6 +252,7 @@ export default function DocumentSettings({ workspace, systemSettings }) { saveChanges={updateWorkspace} embeddingCosts={embeddingsCost} movedItems={movedItems} + embeddingProgress={embeddingProgress} />
); diff --git a/frontend/src/components/Modals/ManageWorkspace/index.jsx b/frontend/src/components/Modals/ManageWorkspace/index.jsx index cf6186bb4c3..c5572b20477 100644 --- a/frontend/src/components/Modals/ManageWorkspace/index.jsx +++ b/frontend/src/components/Modals/ManageWorkspace/index.jsx @@ -9,6 +9,7 @@ import useUser from "../../../hooks/useUser"; import DocumentSettings from "./Documents"; import DataConnectors from "./DataConnectors"; import ModalWrapper from "@/components/ModalWrapper"; +import { EmbeddingProgressProvider } from "@/EmbeddingProgressContext"; const noop = () => {}; const ManageWorkspace = ({ hideModal = noop, providedSlug = null }) => { @@ -80,35 +81,40 @@ const ManageWorkspace = ({ hideModal = noop, providedSlug = null }) => { } return ( -
-
-
-
-
- -
+ +
+
+
+
+
+ +
- {user?.role !== "default" && ( - - )} - - {selectedTab === "documents" ? ( - - ) : ( - - )} + {user?.role !== "default" && ( + + )} + + {selectedTab === "documents" ? ( + + ) : ( + + )} +
-
+
); }; diff --git a/server/.env.example b/server/.env.example index ccc8c87b733..f5ee1ff3df2 100644 --- a/server/.env.example +++ b/server/.env.example @@ -433,4 +433,9 @@ TTS_PROVIDER="native" # Allow native tool calling for specific providers. # This can VASTLY improve performance and speed of agent calls. # Check code for supported providers who can be enabled here via this flag -# PROVIDER_SUPPORTS_NATIVE_TOOL_CALLING="generic-openai,bedrock,localai,groq,litellm,openrouter" \ No newline at end of file +# PROVIDER_SUPPORTS_NATIVE_TOOL_CALLING="generic-openai,bedrock,localai,groq,litellm,openrouter" + +# Native worker process TTL (in seconds). +# Controls how long the forked embedding/reranking worker stays alive after finishing work. +# NATIVE_EMBEDDING_WORKER_TTL=300 +# NATIVE_RERANKING_WORKER_TTL=900 \ No newline at end of file diff --git a/server/endpoints/workspaces.js b/server/endpoints/workspaces.js index f4ca863be5e..cf14efaa892 100644 --- a/server/endpoints/workspaces.js +++ b/server/endpoints/workspaces.js @@ -24,7 +24,10 @@ const { WorkspaceSuggestedMessages, } = require("../models/workspacesSuggestedMessages"); const { validWorkspaceSlug } = require("../utils/middleware/validWorkspace"); -const { convertToChatHistory } = require("../utils/helpers/chat/responses"); +const { + convertToChatHistory, + writeResponseChunk, +} = require("../utils/helpers/chat/responses"); const { CollectorApi } = require("../utils/collectorApi"); const { determineWorkspacePfpFilepath, @@ -1064,6 +1067,44 @@ function workspaceEndpoints(app) { } ); + // SSE endpoint for embedding progress + app.get( + "/workspace/:slug/embed-progress", + [validatedRequest, flexUserRoleValid([ROLES.all]), validWorkspaceSlug], + async (request, response) => { + try { + const workspace = response.locals.workspace; + const user = await userFromSession(request, response); + const userId = user?.id ?? null; + + response.setHeader("Cache-Control", "no-cache"); + response.setHeader("Content-Type", "text/event-stream"); + response.setHeader("Access-Control-Allow-Origin", "*"); + response.setHeader("Connection", "keep-alive"); + response.flushHeaders(); + + const { embeddingProgressBus } = require("../utils/WorkerQueue"); + const { unsubscribe } = embeddingProgressBus.subscribe( + { workspaceSlug: workspace.slug, userId }, + (event) => { + writeResponseChunk(response, event); + } + ); + + // If there's no history, no embedding is in progress right now. + // We intentionally send nothing and keep the connection open — + // events will flow if embedding starts, and the connection is + // cleaned up when the client disconnects (modal close / unmount). + request.on("close", () => { + unsubscribe(); + }); + } catch (e) { + console.error(e.message, e); + response.status(500).end(); + } + } + ); + // Parsed Files in separate endpoint just to keep the workspace endpoints clean workspaceParsedFilesEndpoints(app); } diff --git a/server/models/documents.js b/server/models/documents.js index a13c3a6a2a5..89655ccf2b3 100644 --- a/server/models/documents.js +++ b/server/models/documents.js @@ -84,13 +84,37 @@ const Document = { const VectorDb = getVectorDbClass(); if (additions.length === 0) return { failed: [], embedded: [] }; const { fileData } = require("../utils/files"); + const { + embeddingProgressBus, + setEmbeddingContext, + } = require("../utils/WorkerQueue"); const embedded = []; const failedToEmbed = []; const errors = new Set(); + const totalDocs = additions.length; - for (const path of additions) { + const emitProgress = (type, extra = {}) => + embeddingProgressBus.emit("progress", { + type, + workspaceSlug: workspace.slug, + userId, + ...extra, + }); + + // Signal the full batch so SSE clients (including late-joining ones + // via history replay) can seed the complete file list as "pending". + emitProgress("batch_starting", { filenames: additions, totalDocs }); + + for (const [index, path] of additions.entries()) { + const docProgress = { filename: path, docIndex: index, totalDocs }; const data = await fileData(path); - if (!data) continue; + if (!data) { + emitProgress("doc_failed", { + ...docProgress, + error: "Failed to load file data", + }); + continue; + } const docId = uuidv4(); const { pageContent: _pageContent, ...metadata } = data; @@ -102,11 +126,23 @@ const Document = { metadata: JSON.stringify(metadata), }; + emitProgress("doc_starting", docProgress); + + // Set the document context so that chunk-level progress events from the + // embedding worker can be attributed to this specific document/user. + // Must be cleared after vectorization. See WorkerQueue/index.js for how + // this context is read when forwarding worker progress to the SSE bus. + setEmbeddingContext({ + workspaceSlug: workspace.slug, + filename: path, + userId, + }); const { vectorized, error } = await VectorDb.addDocumentToNamespace( workspace.slug, { ...data, docId }, path ); + setEmbeddingContext(null); if (!vectorized) { console.error( @@ -115,17 +151,33 @@ const Document = { ); failedToEmbed.push(metadata?.title || newDoc.filename); errors.add(error); + emitProgress("doc_failed", { + ...docProgress, + error: error || "Unknown error", + }); continue; } try { await prisma.workspace_documents.create({ data: newDoc }); embedded.push(path); + emitProgress("doc_complete", docProgress); } catch (error) { console.error(error.message); + emitProgress("doc_failed", { + ...docProgress, + error: "Failed to save document record", + }); } } + // Signal that all documents have been processed + emitProgress("all_complete", { + totalDocs, + embedded: embedded.length, + failed: failedToEmbed.length, + }); + await Telemetry.sendTelemetry("documents_embedded_in_workspace", { LLMSelection: process.env.LLM_PROVIDER || "openai", Embedder: process.env.EMBEDDING_ENGINE || "inherit", diff --git a/server/utils/EmbeddingEngines/native/index.js b/server/utils/EmbeddingEngines/native/index.js index 21773fcb564..6e763bd4a51 100644 --- a/server/utils/EmbeddingEngines/native/index.js +++ b/server/utils/EmbeddingEngines/native/index.js @@ -223,27 +223,67 @@ class NativeEmbedder { */ async embedTextInput(textInput) { textInput = this.#applyQueryPrefix(textInput); - const result = await this.embedChunks( - Array.isArray(textInput) ? textInput : [textInput] - ); - return result?.[0] || []; + const input = Array.isArray(textInput) ? textInput : [textInput]; + const pipeline = await this.embedderClient(); + const output = await pipeline(input, { + pooling: "mean", + normalize: true, + }); + return output.length > 0 ? output.tolist()[0] : []; } - // If you are thinking you want to edit this function - you probably don't. - // This process was benchmarked heavily on a t3.small (2GB RAM 1vCPU) - // and without careful memory management for the V8 garbage collector - // this function will likely result in an OOM on any resource-constrained deployment. - // To help manage very large documents we run a concurrent write-log each iteration - // to keep the embedding result out of memory. The `maxConcurrentChunk` is set to 25, - // as 50 seems to overflow no matter what. Given the above, memory use hovers around ~30% - // during a very large document (>100K words) but can spike up to 70% before gc. - // This seems repeatable for all document sizes. - // While this does take a while, it is zero set up and is 100% free and on-instance. - // It still may crash depending on other elements at play - so no promises it works under all conditions. + /** + * Routes embedding through an isolated worker process to protect the main + * server from OOM crashes during large document batches. + * This is the public API that vector DB providers and AI provider wrappers call. + * @param {string[]} textChunks + * @returns {Promise>} + */ async embedChunks(textChunks = []) { + const { queueEmbedding } = require("../../WorkerQueue"); + return await queueEmbedding({ textChunks }); + } + + /** + * + * If you are thinking you want to edit this function - you probably don't. + * This process was benchmarked heavily on a t3.small (2GB RAM 1vCPU) + * and without careful memory management for the V8 garbage collector + * this function will likely result in an OOM on any resource-constrained deployment. + * To help manage very large documents we run a concurrent write-log each iteration + * to keep the embedding result out of memory. The `maxConcurrentChunk` is set to 25, + * as 50 seems to overflow no matter what. Given the above, memory use hovers around ~30% + * during a very large document (>100K words) but can spike up to 70% before gc. + * This seems repeatable for all document sizes. + * While this does take a while, it is zero set up and is 100% free and on-instance. + * It still may crash depending on other elements at play - so no promises it works under all conditions. + * + * Runs the embedding pipeline directly in the current process. Only the + * embedding worker should call this — all other callers should use + * {@link embedChunks} which routes work to the isolated worker process. + * + * Chunk-level progress reporting: + * This method runs inside a forked child process (embeddingWorker.js), so it + * cannot emit events on the main process's EmbeddingProgressBus directly. + * Instead, the worker passes an onProgress callback that sends IPC messages + * back to the parent. The full flow is: + * + * embedChunksInProcess (onProgress callback) + * → embeddingWorker.js (converts callback to process.send IPC message) + * → WorkerQueue.js (receives IPC, calls its own onProgress callback) + * → WorkerQueue/index.js (emits "chunk_progress" on EmbeddingProgressBus) + * → SSE endpoint streams it to the frontend + * + * @param {string[]} textChunks + * @param {function|null} onProgress - Called after each chunk group with { chunksProcessed: number, totalChunks: number }. + * The embedding worker sets this to a function that sends IPC messages to the parent process. + * @returns {Promise|null>} + */ + async embedChunksInProcess(textChunks = [], onProgress = null) { const tmpFilePath = this.#tempfilePath(); const chunks = toChunks(textChunks, this.maxConcurrentChunks); const chunkLen = chunks.length; + const totalChunks = textChunks.length; for (let [idx, chunk] of chunks.entries()) { if (idx === 0) await this.#writeToTempfile(tmpFilePath, "["); @@ -266,6 +306,15 @@ class NativeEmbedder { this.log(`Embedded Chunk Group ${idx + 1} of ${chunkLen}`); if (chunkLen - 1 !== idx) await this.#writeToTempfile(tmpFilePath, ","); if (chunkLen - 1 === idx) await this.#writeToTempfile(tmpFilePath, "]"); + + if (onProgress) { + const chunksProcessed = Math.min( + (idx + 1) * this.maxConcurrentChunks, + totalChunks + ); + onProgress({ chunksProcessed, totalChunks }); + } + pipeline = null; output = null; data = null; diff --git a/server/utils/EmbeddingRerankers/native/index.js b/server/utils/EmbeddingRerankers/native/index.js index 599338224d5..ec68374ec7f 100644 --- a/server/utils/EmbeddingRerankers/native/index.js +++ b/server/utils/EmbeddingRerankers/native/index.js @@ -214,14 +214,30 @@ class NativeEmbeddingReranker { } /** - * Reranks a list of documents based on the query. + * Routes reranking to an isolated worker process to protect the main server + * from OOM. The worker calls {@link rerank} directly to do the actual work. + * Can be called statically without instantiating in the parent process. + */ + static async rerankViaWorker(query, documents, options = { topK: 4 }) { + const { queueReranking } = require("../../WorkerQueue"); + return await queueReranking({ + query, + documents, + topK: options.topK, + }); + } + + /** + * Runs the reranking model directly in the current process. Only the + * reranking worker should call this — all other callers should use + * {@link rerankViaWorker} which routes work to the isolated worker process. * @param {string} query - The query to rerank the documents against. * @param {{text: string}[]} documents - The list of document text snippets to rerank. Should be output from a vector search. * @param {Object} options - The options for the reranking. * @param {number} options.topK - The number of top documents to return. * @returns {Promise} - The reranked list of documents. */ - async rerank(query, documents, options = { topK: 4 }) { + async rerankInProcess(query, documents, options = { topK: 4 }) { await this.initClient(); const model = NativeEmbeddingReranker.#model; const tokenizer = NativeEmbeddingReranker.#tokenizer; diff --git a/server/utils/WorkerQueue/EmbeddingProgressBus.js b/server/utils/WorkerQueue/EmbeddingProgressBus.js new file mode 100644 index 00000000000..2d8c4c795e6 --- /dev/null +++ b/server/utils/WorkerQueue/EmbeddingProgressBus.js @@ -0,0 +1,62 @@ +const EventEmitter = require("events"); + +/** + * Singleton event emitter for streaming embedding progress to SSE clients. + * Buffers events per-workspace so late-joining connections can replay history. + */ +class EmbeddingProgressBus extends EventEmitter { + /** @type {Map} workspace slug → ordered event history */ + #history = new Map(); + + constructor() { + super(); + this.setMaxListeners(50); + + // Buffer progress events so late-joining SSE clients can catch up. + this.on("progress", (event) => { + if (!event.workspaceSlug) return; + const slug = event.workspaceSlug; + if (!this.#history.has(slug)) this.#history.set(slug, []); + this.#history.get(slug).push(event); + + // Clear history shortly after all docs finish (mirrors frontend cleanup). + if (event.type === "all_complete") { + setTimeout(() => this.#history.delete(slug), 10_000); + } + }); + } + + /** + * Register an SSE listener filtered by workspace and user. + * Replays any buffered events for the workspace before subscribing to live events. + * @param {{ workspaceSlug: string, userId?: number }} filter + * @param {function} callback - receives the progress event payload + * @returns {{ unsubscribe: function }} + */ + subscribe(filter, callback) { + // Replay buffered events so reconnecting clients catch up. + if (filter.workspaceSlug && this.#history.has(filter.workspaceSlug)) { + for (const event of this.#history.get(filter.workspaceSlug)) { + if (filter.userId && event.userId && event.userId !== filter.userId) + continue; + callback(event); + } + } + + const handler = (event) => { + if (filter.workspaceSlug && event.workspaceSlug !== filter.workspaceSlug) + return; + if (filter.userId && event.userId && event.userId !== filter.userId) + return; + callback(event); + }; + this.on("progress", handler); + return { + unsubscribe: () => this.off("progress", handler), + }; + } +} + +const embeddingProgressBus = new EmbeddingProgressBus(); + +module.exports = { embeddingProgressBus }; diff --git a/server/utils/WorkerQueue/WorkerQueue.js b/server/utils/WorkerQueue/WorkerQueue.js new file mode 100644 index 00000000000..5804ccb9089 --- /dev/null +++ b/server/utils/WorkerQueue/WorkerQueue.js @@ -0,0 +1,250 @@ +const { fork } = require("child_process"); +const path = require("path"); +const { v4: uuidv4 } = require("uuid"); + +/** + * Generic serial job queue backed by a forked child process. + * Manages worker lifecycle (fork, ready handshake, TTL, graceful shutdown) + * and processes jobs one at a time in FIFO order. + * + * For chunk-level progress: when a worker sends { type: "progress" } IPC messages, + * this class receives them in #onMessage and forwards them to the onProgress + * callback provided at construction. This is how progress crosses the child→parent + * process boundary. See WorkerQueue/index.js for how the callback is wired to the + * EmbeddingProgressBus. + */ +class WorkerQueue { + #worker = null; + #ttlTimer = null; + #readyResolve = null; + #queue = []; + #activeJob = null; + + /** + * @param {Object} options + * @param {string} options.workerScript - Path to worker JS file (relative to this file or absolute) + * @param {number} options.ttl - Ms the worker stays alive after finishing work before being killed + * @param {function|null} options.onProgress - Callback for progress messages from the worker + */ + constructor({ workerScript, ttl = 300_000, onProgress = null }) { + this.workerScript = path.isAbsolute(workerScript) + ? workerScript + : path.resolve(__dirname, workerScript); + this.ttl = ttl; + this._onProgress = onProgress; + } + + get isRunning() { + return this.#worker !== null && this.#worker.connected; + } + + /** + * Add a job to the queue. Returns a promise that resolves with the worker's result. + * @param {{ payload: object }} jobData + * @returns {Promise<{ jobId: string, result: any }>} + */ + enqueue({ payload }) { + const jobId = uuidv4(); + return new Promise((resolve, reject) => { + this.#queue.push({ + jobId, + payload, + resolve, + reject, + }); + if (!this.#activeJob) this.#processNext(); + }); + } + + /** + * Gracefully shutdown the worker. + */ + killWorker() { + this.#clearTTLTimer(); + if (!this.#worker) return; + + try { + if (this.#worker.connected) this.#worker.send({ type: "shutdown" }); + } catch { + // Worker may already be disconnected + } + + const worker = this.#worker; + this.#worker = null; + + setTimeout(() => { + try { + worker.kill("SIGKILL"); + } catch { + /* already dead */ + } + }, 5_000); + } + + // -- internal --------------------------------------------------------------- + + async #processNext() { + if (this.#queue.length === 0) { + this.#activeJob = null; + return; + } + + const job = this.#queue.shift(); + this.#activeJob = job; + + try { + await this.#ensureWorker(); + this.#clearTTLTimer(); + console.log( + `[WorkerQueue] Sending job ${job.jobId} to ${path.basename(this.workerScript)}` + ); + this.#worker.send({ + type: "job", + jobId: job.jobId, + payload: job.payload, + }); + } catch (err) { + job.reject(err); + this.#activeJob = null; + this.#processNext(); + } + } + + /** + * Fork the worker if not already running. Resolves when it sends { type: "ready" }. + */ + #ensureWorker() { + if (this.isRunning) return Promise.resolve(); + + return new Promise((resolve, reject) => { + this.#readyResolve = resolve; + + try { + this.#worker = fork(this.workerScript, [], { + stdio: ["pipe", "pipe", "pipe", "ipc"], + }); + } catch (err) { + this.#readyResolve = null; + return reject(new Error(`Failed to fork worker: ${err.message}`)); + } + + const label = path.basename(this.workerScript); + this.#worker.stdout?.on("data", (data) => + process.stdout.write(`[Worker:${label}] ${data}`) + ); + this.#worker.stderr?.on("data", (data) => + process.stderr.write(`[Worker:${label}] ${data}`) + ); + + this.#worker.on("message", (msg) => this.#onMessage(msg)); + this.#worker.on("exit", (code, signal) => this.#onExit(code, signal)); + this.#worker.on("error", (err) => { + console.error(`[WorkerQueue] Worker error: ${err.message}`); + if (this.#readyResolve) { + this.#readyResolve = null; + reject(err); + } + }); + + setTimeout(() => { + if (this.#readyResolve) { + this.#readyResolve = null; + this.killWorker(); + reject(new Error("Worker did not become ready within 30s")); + } + }, 30_000); + }); + } + + #onMessage(msg) { + if (!msg || !msg.type) return; + + switch (msg.type) { + case "ready": + console.log( + `[WorkerQueue] Worker ${path.basename(this.workerScript)} ready.` + ); + if (this.#readyResolve) { + this.#readyResolve(); + this.#readyResolve = null; + } + break; + + case "result": { + const job = this.#activeJob; + this.#activeJob = null; + this.#resetTTLTimer(); + if (job && job.jobId === msg.jobId) { + job.resolve({ jobId: msg.jobId, result: msg.result }); + } + this.#processNext(); + break; + } + + case "error": { + const job = this.#activeJob; + this.#activeJob = null; + this.#resetTTLTimer(); + if (job && job.jobId === msg.jobId) { + job.reject(new Error(String(msg.error))); + } + this.#processNext(); + break; + } + + // Chunk-level progress from the worker. Forwarded to the onProgress + // callback so the caller (WorkerQueue/index.js) can emit it on the + // EmbeddingProgressBus with the appropriate document context attached. + case "progress": { + if (this._onProgress && this.#activeJob) { + this._onProgress({ + jobId: this.#activeJob.jobId, + chunksProcessed: msg.chunksProcessed, + totalChunks: msg.totalChunks, + }); + } + break; + } + + default: + console.warn(`[WorkerQueue] Unknown message type: ${msg.type}`); + } + } + + #onExit(code, signal) { + const job = this.#activeJob; + this.#worker = null; + this.#activeJob = null; + this.#clearTTLTimer(); + + if (job) { + const errorMsg = `Worker exited unexpectedly (code=${code}, signal=${signal}) while processing job ${job.jobId}`; + console.error(`[WorkerQueue] ${errorMsg}`); + job.reject(new Error(errorMsg)); + this.#processNext(); + } else { + console.log( + `[WorkerQueue] Worker ${path.basename(this.workerScript)} exited (code=${code}, signal=${signal})` + ); + } + } + + #resetTTLTimer() { + this.#clearTTLTimer(); + this.#ttlTimer = setTimeout(() => { + console.log( + `[WorkerQueue] TTL expired for ${path.basename(this.workerScript)}, killing worker.` + ); + this.killWorker(); + }, this.ttl); + } + + #clearTTLTimer() { + if (this.#ttlTimer) { + clearTimeout(this.#ttlTimer); + this.#ttlTimer = null; + } + } +} + +module.exports = { WorkerQueue }; diff --git a/server/utils/WorkerQueue/index.js b/server/utils/WorkerQueue/index.js new file mode 100644 index 00000000000..3b912024165 --- /dev/null +++ b/server/utils/WorkerQueue/index.js @@ -0,0 +1,102 @@ +const { WorkerQueue } = require("./WorkerQueue"); +const { embeddingProgressBus } = require("./EmbeddingProgressBus"); + +// --------------------------------------------------------------------------- +// Queue instances & environment helpers +// --------------------------------------------------------------------------- +const DEFAULT_EMBEDDING_TTL_SEC = 300; +const DEFAULT_RERANKING_TTL_SEC = 900; + +function envTTLSec(envKey, fallback) { + const raw = process.env[envKey]; + if (raw === undefined || raw === "") return fallback; + const val = Number(raw); + return !isNaN(val) && val >= 0 ? val : fallback; +} + +// --------------------------------------------------------------------------- +// Embedding context for chunk-level progress +// --------------------------------------------------------------------------- +// +// Problem: chunk progress originates inside the embedding worker (child process) +// which only knows about text chunks — it has no idea which document, workspace, +// or user it's working for. That context lives in Document.addDocuments (main process). +// +// Solution: Document.addDocuments calls setEmbeddingContext() with the current +// document's { workspaceSlug, filename, userId } before starting vectorization, +// and clears it (null) after. When the worker sends chunk progress back via IPC, +// the onProgress callback below reads this context to emit a fully-attributed +// "chunk_progress" event on the EmbeddingProgressBus. +// +// This is safe because the embedding queue processes jobs serially — only one +// document is ever being embedded at a time, so the context always matches +// the active job. +// --------------------------------------------------------------------------- +let _currentEmbeddingContext = null; + +/** + * Set the current document context for chunk-level progress events. + * Called by Document.addDocuments before/after each vectorization call. + * @param {{ workspaceSlug: string, filename: string, userId: number|null }|null} ctx + */ +function setEmbeddingContext(ctx) { + _currentEmbeddingContext = ctx; +} + +const embeddingQueue = new WorkerQueue({ + workerScript: "../../workers/embeddingWorker.js", + ttl: + envTTLSec("NATIVE_EMBEDDING_WORKER_TTL", DEFAULT_EMBEDDING_TTL_SEC) * 1000, + // Final step in the chunk progress chain: receives IPC progress from the + // worker (via WorkerQueue.#onMessage), attaches the document context set by + // Document.addDocuments, and emits a "chunk_progress" event on the bus. + // From here the existing SSE infrastructure streams it to the frontend. + onProgress: (progress) => { + if (!_currentEmbeddingContext) return; + embeddingProgressBus.emit("progress", { + type: "chunk_progress", + workspaceSlug: _currentEmbeddingContext.workspaceSlug, + filename: _currentEmbeddingContext.filename, + userId: _currentEmbeddingContext.userId, + chunksProcessed: progress.chunksProcessed, + totalChunks: progress.totalChunks, + }); + }, +}); + +const rerankingQueue = new WorkerQueue({ + workerScript: "../../workers/rerankingWorker.js", + ttl: + envTTLSec("NATIVE_RERANKING_WORKER_TTL", DEFAULT_RERANKING_TTL_SEC) * 1000, +}); + +/** + * Queue an embedding job for the native embedder worker. + * @param {{ textChunks: string[] }} payload + * @returns {Promise>} The embedding vectors + */ +async function queueEmbedding(payload) { + embeddingQueue.ttl = + envTTLSec("NATIVE_EMBEDDING_WORKER_TTL", DEFAULT_EMBEDDING_TTL_SEC) * 1000; + const { result } = await embeddingQueue.enqueue({ payload }); + return result.vectors; +} + +/** + * Queue a reranking job for the native reranker worker. + * @param {{ query: string, documents: Array<{text: string}>, topK?: number }} payload + * @returns {Promise} The reranked documents + */ +async function queueReranking(payload) { + rerankingQueue.ttl = + envTTLSec("NATIVE_RERANKING_WORKER_TTL", DEFAULT_RERANKING_TTL_SEC) * 1000; + const { result } = await rerankingQueue.enqueue({ payload }); + return result.reranked; +} + +module.exports = { + queueEmbedding, + queueReranking, + embeddingProgressBus, + setEmbeddingContext, +}; diff --git a/server/utils/helpers/updateENV.js b/server/utils/helpers/updateENV.js index cc08afbb03e..dce0a5f0f55 100644 --- a/server/utils/helpers/updateENV.js +++ b/server/utils/helpers/updateENV.js @@ -1314,6 +1314,10 @@ function dumpENV() { // Allow native tool calling for specific providers. "PROVIDER_SUPPORTS_NATIVE_TOOL_CALLING", + + // Native worker process TTL (in seconds) + "NATIVE_EMBEDDING_WORKER_TTL", + "NATIVE_RERANKING_WORKER_TTL", ]; // Simple sanitization of each value to prevent ENV injection via newline or quote escaping. diff --git a/server/utils/vectorDbProviders/lance/index.js b/server/utils/vectorDbProviders/lance/index.js index f41fae92bd8..e4512dbec4e 100644 --- a/server/utils/vectorDbProviders/lance/index.js +++ b/server/utils/vectorDbProviders/lance/index.js @@ -94,7 +94,6 @@ class LanceDb extends VectorDatabase { similarityThreshold = 0.25, filterIdentifiers = [], }) { - const reranker = new NativeEmbeddingReranker(); const collection = await client.openTable(namespace); const totalEmbeddings = await this.namespaceCount(namespace); const result = { @@ -126,8 +125,9 @@ class LanceDb extends VectorDatabase { .limit(searchLimit) .toArray(); - await reranker - .rerank(query, vectorSearchResults, { topK: topN }) + await NativeEmbeddingReranker.rerankViaWorker(query, vectorSearchResults, { + topK: topN, + }) .then((rerankResults) => { rerankResults.forEach((item) => { if (this.distanceToSimilarity(item._distance) < similarityThreshold) diff --git a/server/workers/embeddingWorker.js b/server/workers/embeddingWorker.js new file mode 100644 index 00000000000..ca88f4d68ed --- /dev/null +++ b/server/workers/embeddingWorker.js @@ -0,0 +1,81 @@ +/** + * Embedding Worker Process + * + * Runs NativeEmbedder in an isolated child process so that OOM from large + * document batches only kills this worker, not the main server. + * + * This is one step in the chunk-level progress reporting chain. The worker + * passes an onProgress callback to embedChunksInProcess that converts each + * progress update into an IPC message (process.send). The parent process's + * WorkerQueue receives these IPC messages and forwards them to the + * EmbeddingProgressBus, which streams them to the frontend via SSE. + * + * IPC protocol: + * - Receives: { type: "job", jobId, payload: { textChunks } } + * - Sends: { type: "ready" } + * - Sends: { type: "progress", jobId, chunksProcessed, totalChunks } + * - Sends: { type: "result", jobId, result: { vectors } } + * - Sends: { type: "error", jobId, error: string } + * - Receives: { type: "shutdown" } + */ + +process.env.NODE_ENV === "development" + ? require("dotenv").config({ + path: require("path").resolve( + __dirname, + `../.env.${process.env.NODE_ENV}` + ), + }) + : require("dotenv").config({ + path: require("path").resolve(__dirname, "../.env"), + }); + +const { NativeEmbedder } = require("../utils/EmbeddingEngines/native"); + +let embedder = null; + +process.on("message", async (msg) => { + if (!msg || !msg.type) return; + + if (msg.type === "job") { + try { + if (!embedder) embedder = new NativeEmbedder(); + + const { textChunks } = msg.payload; + + // Bridge chunk progress from embedChunksInProcess to the parent process. + // This callback converts in-process progress into IPC messages that the + // parent's WorkerQueue receives and forwards to the EmbeddingProgressBus. + const result = await embedder.embedChunksInProcess( + textChunks, + (progress) => { + process.send({ + type: "progress", + jobId: msg.jobId, + chunksProcessed: progress.chunksProcessed, + totalChunks: progress.totalChunks, + }); + } + ); + + process.send({ + type: "result", + jobId: msg.jobId, + result: { vectors: result }, + }); + } catch (err) { + process.send({ + type: "error", + jobId: msg.jobId, + error: err.message || String(err), + }); + } + } + + if (msg.type === "shutdown") { + process.exit(0); + } +}); + +// Signal that the worker is ready +process.send({ type: "ready" }); diff --git a/server/workers/rerankingWorker.js b/server/workers/rerankingWorker.js new file mode 100644 index 00000000000..8db0b50db6a --- /dev/null +++ b/server/workers/rerankingWorker.js @@ -0,0 +1,62 @@ +/** + * Reranking Worker Process + * + * Runs NativeEmbeddingReranker in an isolated child process so that OOM + * only kills this worker, not the main server. + * + * Communicates with the main process via IPC messages: + * - Receives: { type: "job", jobId, payload: { query, documents, topK } } + * - Sends: { type: "ready" } + * - Sends: { type: "result", jobId, result: { reranked } } + * - Sends: { type: "error", jobId, error: string } + * - Receives: { type: "shutdown" } + */ + +process.env.NODE_ENV === "development" + ? require("dotenv").config({ + path: require("path").resolve( + __dirname, + `../.env.${process.env.NODE_ENV}` + ), + }) + : require("dotenv").config({ + path: require("path").resolve(__dirname, "../.env"), + }); + +const { + NativeEmbeddingReranker, +} = require("../utils/EmbeddingRerankers/native"); + +let reranker = null; + +process.on("message", async (msg) => { + if (!msg || !msg.type) return; + + if (msg.type === "job") { + try { + if (!reranker) reranker = new NativeEmbeddingReranker(); + + const { query, documents, topK = 4 } = msg.payload; + const result = await reranker.rerankInProcess(query, documents, { topK }); + + process.send({ + type: "result", + jobId: msg.jobId, + result: { reranked: result }, + }); + } catch (err) { + process.send({ + type: "error", + jobId: msg.jobId, + error: err.message || String(err), + }); + } + } + + if (msg.type === "shutdown") { + process.exit(0); + } +}); + +// Signal that the worker is ready +process.send({ type: "ready" });