-
-
Notifications
You must be signed in to change notification settings - Fork 6.1k
feat: Move Native Embedder and Reranker Into Isolated Workers w/ Job Queues | Add Document Embedding Status Events #5192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
angelplusultra
wants to merge
48
commits into
master
Choose a base branch
from
feat-native-embedder-job-queue
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+1,119
β75
Open
Changes from 36 commits
Commits
Show all changes
48 commits
Select commit
Hold shift + click to select a range
d2540a2
implement native embedder job queue
angelplusultra e941b17
persist embedding progress across renders
angelplusultra a0cd78b
add development worker timeouts
angelplusultra 573856a
change to static method
angelplusultra c80107e
native reranker
angelplusultra 0149020
remove useless return
angelplusultra acd1e3d
lint
angelplusultra f4f6b78
simplify
angelplusultra af3c6e0
make embedding worker timeout value configurable by admin
angelplusultra 592905c
add event emission for missing data
angelplusultra 73ce274
lint
angelplusultra f0e5117
remove onProgress callback argument
angelplusultra 8c21420
make rerank to rerankDirect
angelplusultra 37ddfaa
persists progress state across app reloads
angelplusultra 0bee3ec
remove chunk level progress reporting
angelplusultra 59d3727
remove unuse dvariable
angelplusultra 6ff2015
make NATIVE_RERANKING_WORKER_TIMEOUT user configurable
angelplusultra 21c5784
remove dead code
angelplusultra addab25
scope embedding progress per-user and clear stale state on SSE reconnect
angelplusultra 7a80d1e
lint
angelplusultra c459c45
revert vector databases and embedding engines to call their original β¦
angelplusultra c23bb05
simplify rerank
angelplusultra 292b61a
simplify progress fetching by removing updateProgressFromApi
angelplusultra 1718f92
remove duplicate jsdoc
angelplusultra 3fbc217
replace sessionStorage persistence with server-side history replay foβ¦
angelplusultra 9eff983
fix old comment
angelplusultra 4ae70cb
fix: ignore premature SSE all_complete when embedding hasn't started yet
angelplusultra 3c6d12a
reduce duplication with progress emissions
angelplusultra 2877dc9
remove dead code
angelplusultra 9a15f55
refactor: streamline embedding progress handling
angelplusultra dce2bf8
fix stale comment
angelplusultra 0722754
remove unused function
angelplusultra 7df5535
fix event emissions for document creation failure
angelplusultra 5195cca
refactor: move Reranking Worker Idle Timeout input to LanceDBOptions β¦
angelplusultra c88fac3
lint
angelplusultra 8b99aea
remove unused hadHistory vars
angelplusultra eafa12c
refactor workspace directory by hoisting component and converting intβ¦
angelplusultra f335e2b
moved EmbeddingProgressProvider to wrap Document Manager Modal
angelplusultra 2b7bce8
refactor embed progress SSE connection to use fetchEventSource insteaβ¦
angelplusultra 4ed2304
refactor message handlng into a function and reduce duplication
angelplusultra 904ed16
refactor: utilize writeResponseChunk for event emissions in document β¦
angelplusultra b1d2dd6
refactor: explicit in-proc embedding and rerank methods that are callβ¦
angelplusultra 9a63f98
Abstract EmbeddingProgressBus and Worker Queue into modules
angelplusultra cf8c681
remove error and toast messages on embed process result
angelplusultra fd6dc2b
use safeJsonParse
angelplusultra dd1c0e4
add chunk-level progress events with per-document progress bar in UI
angelplusultra d27576d
remove unused parameter
angelplusultra 77c85e7
rename all worker timeout references to use ttl | remove ttl updatingβ¦
angelplusultra File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,170 @@ | ||
| import { | ||
| createContext, | ||
| useCallback, | ||
| useContext, | ||
| useEffect, | ||
| useRef, | ||
| useState, | ||
| } from "react"; | ||
| import { API_BASE, AUTH_TOKEN } from "@/utils/constants"; | ||
|
|
||
| const EmbeddingProgressContext = createContext(); | ||
|
|
||
| export function useEmbeddingProgress() { | ||
| const ctx = useContext(EmbeddingProgressContext); | ||
| if (!ctx) | ||
| throw new Error( | ||
| "useEmbeddingProgress must be used within EmbeddingProgressProvider" | ||
| ); | ||
| return ctx; | ||
| } | ||
|
|
||
| export function EmbeddingProgressProvider({ children }) { | ||
| const [embeddingProgressMap, setEmbeddingProgressMap] = useState({}); | ||
| const eventSourcesRef = useRef({}); | ||
| const cleanupTimeoutsRef = useRef({}); | ||
|
|
||
| // Cleanup all EventSources on unmount | ||
| useEffect(() => { | ||
| return () => { | ||
| for (const slug of Object.keys(eventSourcesRef.current)) { | ||
| eventSourcesRef.current[slug]?.close(); | ||
| } | ||
| }; | ||
| }, []); | ||
|
|
||
| /** | ||
| * Open (or reconnect) an SSE EventSource for a given workspace slug. | ||
| * Updates embeddingProgressMap in real time as events arrive. | ||
| */ | ||
| const connectSSE = useCallback((slug) => { | ||
| // Don't double-connect | ||
| if (eventSourcesRef.current[slug]) return; | ||
|
|
||
| try { | ||
| const token = window.localStorage.getItem(AUTH_TOKEN); | ||
| const progressUrl = new URL( | ||
| `${API_BASE}/workspace/${slug}/embed-progress` | ||
| ); | ||
| if (token) progressUrl.searchParams.set("token", token); | ||
|
|
||
| const eventSource = new EventSource(progressUrl.toString()); | ||
| eventSourcesRef.current[slug] = eventSource; | ||
timothycarambat marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| eventSource.onmessage = (event) => { | ||
| try { | ||
| const data = JSON.parse(event.data); | ||
|
|
||
| switch (data.type) { | ||
| case "batch_starting": { | ||
| const initial = {}; | ||
| for (const name of data.filenames || []) { | ||
| initial[name] = { status: "pending" }; | ||
| } | ||
| setEmbeddingProgressMap((prev) => ({ | ||
| ...prev, | ||
| [slug]: { ...initial, ...prev[slug] }, | ||
| })); | ||
| break; | ||
| } | ||
|
|
||
| case "doc_starting": | ||
| setEmbeddingProgressMap((prev) => ({ | ||
| ...prev, | ||
| [slug]: { | ||
| ...prev[slug], | ||
| [data.filename]: { status: "embedding" }, | ||
| }, | ||
| })); | ||
| break; | ||
|
|
||
| case "doc_complete": | ||
| setEmbeddingProgressMap((prev) => ({ | ||
| ...prev, | ||
| [slug]: { | ||
| ...prev[slug], | ||
| [data.filename]: { status: "complete" }, | ||
| }, | ||
| })); | ||
| break; | ||
|
|
||
| case "doc_failed": | ||
| setEmbeddingProgressMap((prev) => ({ | ||
| ...prev, | ||
| [slug]: { | ||
| ...prev[slug], | ||
| [data.filename]: { | ||
| status: "failed", | ||
| error: data.error || "Embedding failed", | ||
| }, | ||
| }, | ||
| })); | ||
| break; | ||
|
|
||
| case "all_complete": | ||
| // A real embedding job just finished β close and schedule cleanup. | ||
| eventSource.close(); | ||
| delete eventSourcesRef.current[slug]; | ||
| cleanupTimeoutsRef.current[slug] = setTimeout(() => { | ||
| setEmbeddingProgressMap((prev) => { | ||
| const next = { ...prev }; | ||
| delete next[slug]; | ||
| return next; | ||
| }); | ||
| delete cleanupTimeoutsRef.current[slug]; | ||
| }, 5000); | ||
| break; | ||
| } | ||
| } catch { | ||
| // ignore parse errors | ||
| } | ||
timothycarambat marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| }; | ||
|
|
||
| eventSource.onerror = () => { | ||
| eventSource.close(); | ||
| delete eventSourcesRef.current[slug]; | ||
| }; | ||
| } catch { | ||
| // SSE is optional β embedding still works without it | ||
| } | ||
| }, []); | ||
|
|
||
| const startEmbedding = useCallback( | ||
| (slug, filenames) => { | ||
| // Close any existing EventSource for this slug | ||
| if (eventSourcesRef.current[slug]) { | ||
| eventSourcesRef.current[slug].close(); | ||
| delete eventSourcesRef.current[slug]; | ||
| } | ||
| if (cleanupTimeoutsRef.current[slug]) { | ||
| clearTimeout(cleanupTimeoutsRef.current[slug]); | ||
| delete cleanupTimeoutsRef.current[slug]; | ||
| } | ||
|
|
||
| // Set all filenames to pending | ||
| const initialProgress = {}; | ||
| for (const name of filenames) { | ||
| initialProgress[name] = { status: "pending" }; | ||
| } | ||
| setEmbeddingProgressMap((prev) => ({ | ||
| ...prev, | ||
| [slug]: { ...initialProgress }, | ||
| })); | ||
|
|
||
| connectSSE(slug); | ||
| }, | ||
| [connectSSE] | ||
| ); | ||
|
|
||
| return ( | ||
| <EmbeddingProgressContext.Provider | ||
| value={{ | ||
| embeddingProgressMap, | ||
| startEmbedding, | ||
| connectSSE, | ||
| }} | ||
| > | ||
| {children} | ||
| </EmbeddingProgressContext.Provider> | ||
| ); | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.