fix(indexer): eliminate delete/insert cycle on file PUT and PATCH#292
fix(indexer): eliminate delete/insert cycle on file PUT and PATCH#292EnjoyBacon7 wants to merge 11 commits intodevfrom
Conversation
Closes #286. PATCH (metadata update) no longer deletes the file. Instead, it fetches existing chunks with their Milvus _id and vectors, merges new metadata, and upserts back into Milvus — no re-embedding. The PostgreSQL file record is updated in-place, so workspace FK references and file_count are never disturbed. PUT (file replace) now deletes only Milvus chunks while preserving the PostgreSQL File row. New chunks are embedded and inserted, then the PG row is updated in-place. Workspace associations survive without the snapshot/restore workaround from PR #280. New methods: - PartitionFileManager.update_file_metadata_in_db() — in-place PG update - PartitionFileManager.update_file_in_partition() — in-place PG update for PUT (metadata + relationship_id + parent_id) - MilvusDB.delete_file_chunks() — Milvus-only delete, PG untouched - MilvusDB.upsert_file_metadata() — fetch chunks with vectors, merge metadata, upsert to Milvus + update PG - MilvusDB.add_documents_for_existing_file() — embed + insert new chunks, update existing PG row (no duplicate check) - Indexer.replace_file_documents() — routes to the above - get_file_chunks() gains include_vectors parameter
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds a replace-mode to file indexing: Changes
Sequence DiagramsequenceDiagram
actor Router as Router
participant Indexer as Indexer
participant VDB as VectorDB
participant Milvus as Milvus
participant PG as PostgreSQL
Router->>Indexer: PUT /file -> add_file(..., replace=True)
Indexer->>VDB: add_documents_for_existing_file(chunks, user)
VDB->>Milvus: Insert new chunk entities
Milvus-->>VDB: Insert OK
VDB->>VDB: snapshot old chunk IDs
VDB->>Milvus: delete_chunks_by_ids(old_ids)
Milvus-->>VDB: Delete OK
VDB->>PG: update_file_in_partition(file_id, partition, metadata)
PG-->>VDB: Update OK
Indexer-->>Router: Completion (workspace re-association skipped)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
openrag/components/indexer/indexer.py (1)
170-178: Please route the newVectordbcalls through the timeout/cancellation path.
replace_file_documents()and the new metadata-upsert path both awaitVectordbactor methods directly. That bypasses the repo's timeout wrapper, so a wedged Ray call can hang PUT/PATCH indefinitely and won't follow the explicit cancellation handling expected forIndexeractor methods.As per coding guidelines, "Use the centralized
call_ray_actor_with_timeout()utility fromcomponents.ray_utilsfor all Ray actor method calls to ensure proper timeout and cancellation handling" and "Ray Actor method implementations inIndexer,TaskStateManager,Vectordb, andMilvusDBshould use.remote()calls and handleasyncio.CancelledError,RayTaskError, andTaskCancelledErrorexceptions."Also applies to: 211-214
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openrag/components/indexer/indexer.py` around lines 170 - 178, The replace_file_documents method currently awaits the Vectordb actor directly (vectordb.add_documents_for_existing_file.remote(...)), bypassing the repo's timeout/cancellation wrapper; update replace_file_documents (and the similar call at the other location around the 211-214 region) to invoke the actor via the centralized call_ray_actor_with_timeout utility from components.ray_utils (calling the actor method via .remote()) and propagate/await its result, and ensure the Indexer method catches and handles asyncio.CancelledError, RayTaskError, and TaskCancelledError per the repo pattern so Ray timeouts and cancellations are honored.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@openrag/components/indexer/vectordb/utils.py`:
- Around line 178-195: The helper update_file_metadata_in_db only writes
File.file_metadata which causes structured fields to go stale; modify
update_file_metadata_in_db (and ensure patch_file() uses it) to also read
relevant keys from the incoming file_metadata or patch payload and update the
corresponding ORM attributes File.partition_name, File.relationship_id, and
File.parent_id when those keys are present (preserving existing values when
absent), then commit; ensure you bind the logger and maintain the current
not-found handling and return semantics.
- Around line 223-226: The PUT replacement logic in
add_documents_for_existing_file() is currently protecting file.relationship_id
and file.parent_id from being cleared because it only assigns when
relationship_id/parent_id are not None; change this to respect explicit nulls
from the incoming metadata by using the presence of those keys (e.g., check
"relationship_id" in file_metadata and "parent_id" in file_metadata) or assign
the values directly so None overwrites the old value; update the block that sets
file.relationship_id and file.parent_id to assign when the key exists (allowing
None) instead of guarding on value truthiness so stale links can be cleared.
In `@openrag/components/indexer/vectordb/vectordb.py`:
- Around line 648-651: The delete call in vectordb.py currently interpolates raw
partition and file_id into the Milvus filter (the call to
self._async_client.delete with filter=f"partition == '{partition}' and file_id
== '{file_id}'"), which allows injection via quotes/brackets; fix by
constructing a safe filter instead of string interpolation: either use the
Milvus expression/JSON DSL or a parameterized API if available, or at minimum
escape/validate partition and file_id (escape quotes/backslashes and forbid
characters that can alter the predicate) before injecting them into the filter
string; update the code that builds the filter passed to _async_client.delete
(and any helper that formats partition/file_id) so expressions are safe.
- Around line 698-704: When constructing file-level metadata from
docs[0].metadata (in the block that sets file_metadata and calls
self.partition_file_manager.update_file_metadata_in_db), strip per-chunk/order
fields that originate from stored Milvus rows (e.g., "section_id",
"prev_section_id", "next_section_id" and any other chunk-order keys used
elsewhere) before merging with metadata so File.file_metadata matches the shape
produced by async_add_documents(); remove those keys from the copied dict (same
place you already pop "_id", "vector", "page") prior to calling
update_file_metadata_in_db.
In `@openrag/routers/indexer.py`:
- Around line 275-281: The code currently deletes old Milvus chunks via
call_ray_actor_with_timeout(vectordb.delete_file_chunks.remote(...)) before the
replacement is fully created; instead, change the flow so you first call
Indexer.add_file() (and wait for it to complete and confirm chunks were
created/inserted into a staging/new partition), verify the new insert succeeded
and produced >0 chunks (or handle the zero-chunk case as a failure/abort), and
only then call vectordb.delete_file_chunks to remove the old chunks; ensure the
sequence around call_ray_actor_with_timeout, vectordb.delete_file_chunks.remote,
and Indexer.add_file() is reordered to perform insert durability/verification
before deletion so we avoid the half-replaced state.
---
Nitpick comments:
In `@openrag/components/indexer/indexer.py`:
- Around line 170-178: The replace_file_documents method currently awaits the
Vectordb actor directly (vectordb.add_documents_for_existing_file.remote(...)),
bypassing the repo's timeout/cancellation wrapper; update replace_file_documents
(and the similar call at the other location around the 211-214 region) to invoke
the actor via the centralized call_ray_actor_with_timeout utility from
components.ray_utils (calling the actor method via .remote()) and
propagate/await its result, and ensure the Indexer method catches and handles
asyncio.CancelledError, RayTaskError, and TaskCancelledError per the repo
pattern so Ray timeouts and cancellations are honored.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: bc0bff5e-1a7c-48dc-9b50-0d44a6027435
📒 Files selected for processing (4)
openrag/components/indexer/indexer.pyopenrag/components/indexer/vectordb/utils.pyopenrag/components/indexer/vectordb/vectordb.pyopenrag/routers/indexer.py
upsert_file_metadata builds file-level metadata from a Milvus chunk row which includes per-chunk fields (section_id, prev_section_id, next_section_id). Strip them before writing to File.file_metadata so the PG record shape matches what async_add_documents produces.
When file_metadata contains relationship_id or parent_id keys, update the corresponding File ORM columns too so they never diverge from the JSON blob. Uses key-presence checks so explicit None values can clear stale links.
…ition Use a sentinel _UNSET object to distinguish 'not provided' from explicit None. This lets PUT clear stale relationship_id / parent_id links when the replacement file omits those fields, instead of silently preserving the old values.
Replace f-string interpolation with filter_params to prevent expression injection via crafted file_id or partition values. Follows the same pattern already used by get_file_chunks.
The PUT flow now snapshots old chunk _id values, inserts new chunks, then deletes old chunks by ID. If embedding or insertion fails, old chunks remain intact. If the old-chunk deletion fails, we have temporary duplicates but no data loss. - add_documents_for_existing_file: reordered to insert → delete - get_file_chunk_ids: new method to fetch _id values - delete_chunks_by_ids: new method to delete by primary key - Router no longer pre-deletes Milvus chunks
There was a problem hiding this comment.
🧹 Nitpick comments (2)
openrag/components/indexer/vectordb/vectordb.py (2)
774-844: Insert-before-delete pattern correctly implemented.The flow ensures no data loss: old chunks remain intact if embedding/insertion fails, and temporary duplicates (if deletion fails) are recoverable. The error handling is comprehensive.
Minor: Line 795 calls
self.logger.bind(...)but discards the return value. Loguru'sbind()returns a new logger instance; it doesn't mutate in place. Subsequentself.loggercalls won't include the bound context.♻️ Suggested fix for logger binding
- self.logger.bind(partition=partition, file_id=file_id, filename=file_metadata.get("filename")) + log = self.logger.bind(partition=partition, file_id=file_id, filename=file_metadata.get("filename"))Then use
loginstead ofself.loggerfor subsequent calls in this method.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openrag/components/indexer/vectordb/vectordb.py` around lines 774 - 844, The call to self.logger.bind(...) in add_documents_for_existing_file returns a new logger with bound context but the return value is discarded; capture it (e.g., log = self.logger.bind(...)) and then use that bound logger variable (log) for all subsequent logging within the method (info/exception calls) so the partition/file_id/filename context is included in logs; update every reference to self.logger inside add_documents_for_existing_file (including the EmbeddingError, VDBError, and generic Exception handlers and the final info) to use the bound logger variable instead.
640-664: Missing catch-all exception handler for consistency.Other similar methods in this file (e.g.,
delete_file,get_file_chunk_ids) include a catch-allexcept Exceptionblock that logs and wraps unexpected errors. This method only catchesMilvusException, so unexpected errors would propagate without logging context.♻️ Suggested fix to add catch-all handler
except MilvusException as e: log.exception(f"Couldn't delete file chunks for file_id {file_id}", error=str(e)) raise VDBDeleteError( f"Couldn't delete file chunks for file_id {file_id}: {e!s}", collection_name=self.collection_name, partition=partition, file_id=file_id, ) + except Exception as e: + log.exception("Unexpected error while deleting file chunks", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while deleting file chunks {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openrag/components/indexer/vectordb/vectordb.py` around lines 640 - 664, The delete_file_chunks method currently only catches MilvusException; add a catch-all except Exception block after the existing MilvusException handler that logs the unexpected error via the bound logger (same style as other methods like delete_file and get_file_chunk_ids) and re-raises it wrapped in VDBDeleteError including collection_name, partition and file_id so unexpected exceptions are logged and normalized consistently with the rest of the class.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@openrag/components/indexer/vectordb/vectordb.py`:
- Around line 774-844: The call to self.logger.bind(...) in
add_documents_for_existing_file returns a new logger with bound context but the
return value is discarded; capture it (e.g., log = self.logger.bind(...)) and
then use that bound logger variable (log) for all subsequent logging within the
method (info/exception calls) so the partition/file_id/filename context is
included in logs; update every reference to self.logger inside
add_documents_for_existing_file (including the EmbeddingError, VDBError, and
generic Exception handlers and the final info) to use the bound logger variable
instead.
- Around line 640-664: The delete_file_chunks method currently only catches
MilvusException; add a catch-all except Exception block after the existing
MilvusException handler that logs the unexpected error via the bound logger
(same style as other methods like delete_file and get_file_chunk_ids) and
re-raises it wrapped in VDBDeleteError including collection_name, partition and
file_id so unexpected exceptions are logged and normalized consistently with the
rest of the class.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: c4f89cb4-38dd-4bda-8af0-bf1bc57aa2b7
📒 Files selected for processing (3)
openrag/components/indexer/vectordb/utils.pyopenrag/components/indexer/vectordb/vectordb.pyopenrag/routers/indexer.py
🚧 Files skipped from review as they are similar to previous changes (2)
- openrag/routers/indexer.py
- openrag/components/indexer/vectordb/utils.py
The self.logger.bind() call returns a new logger with bound context, but the return value was being discarded. Now captures it as 'log' and uses it for the success log message so partition/file_id/filename context is included.
Adds VDBError re-raise and catch-all Exception handler to match the pattern used by other similar methods like delete_file. This ensures unexpected errors are logged with context and wrapped in UnexpectedVDBError.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
openrag/components/indexer/vectordb/vectordb.py (2)
692-719: Add logger context binding and catch-all exception handler.Two consistency issues:
- Missing
log = self.logger.bind(file_id=file_id, partition=partition)— other methods likeget_file_chunksanddelete_file_chunksbind context before logging.- Missing catch-all
Exceptionhandler —get_file_chunks(line 920) has one, but this method doesn't.♻️ Proposed fix
async def get_file_chunk_ids(self, file_id: str, partition: str) -> list[int]: """Return the Milvus _id values for all chunks of a file.""" + log = self.logger.bind(file_id=file_id, partition=partition) try: results = [] offset = 0 limit = 100 while True: response = await self._async_client.query( ... ) if not response: break results.extend(r["_id"] for r in response) offset += len(response) return results except MilvusException as e: - self.logger.exception("Failed to get file chunk IDs", error=str(e)) + log.exception("Failed to get file chunk IDs", error=str(e)) raise VDBSearchError( f"Failed to get file chunk IDs for {file_id}: {e!s}", collection_name=self.collection_name, partition=partition, file_id=file_id, ) + except Exception as e: + log.exception("Unexpected error while getting file chunk IDs", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while getting file chunk IDs for {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openrag/components/indexer/vectordb/vectordb.py` around lines 692 - 719, In get_file_chunk_ids, bind logger context at the start (e.g. log = self.logger.bind(file_id=file_id, partition=partition)) and use that bound logger for exception logging; keep the existing MilvusException block but change self.logger.exception(...) to log.exception(...). Also add a catch-all except Exception as e block after the MilvusException handler that logs via log.exception("Failed to get file chunk IDs", error=str(e)) and re-raises a VDBSearchError with the same payload style (message, collection_name, partition, file_id) as in get_file_chunks/delete_file_chunks.
675-690: Add catch-all exception handler for consistency.Unlike other deletion methods in this file (
delete_file,delete_file_chunks,delete_partition), this method lacks a catch-allExceptionhandler. Unexpected errors would propagate unwrapped, inconsistent with the error handling pattern established elsewhere.♻️ Proposed fix
except MilvusException as e: self.logger.exception("Failed to delete old chunks by ID", error=str(e)) raise VDBDeleteError( f"Failed to delete old chunks by ID: {e!s}", collection_name=self.collection_name, ) + except Exception as e: + self.logger.exception("Unexpected error while deleting chunks by ID", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while deleting chunks by ID: {e!s}", + collection_name=self.collection_name, + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@openrag/components/indexer/vectordb/vectordb.py` around lines 675 - 690, The delete_chunks_by_ids method only catches MilvusException and should also catch any other Exception to match the file's error-handling pattern; update delete_chunks_by_ids to add a broad except Exception as e block after the existing MilvusException handler that logs via self.logger.exception (including the error) and raises a VDBDeleteError with the same structure used elsewhere (include f"Failed to delete old chunks by ID: {e!s}" and collection_name=self.collection_name) so unexpected errors are wrapped consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@openrag/components/indexer/vectordb/vectordb.py`:
- Around line 842-853: The exception handlers in the add-chunks flow are using
self.logger and thus drop the bound context created as log (with partition and
file_id); change the three spots that currently call self.logger.exception to
use the bound logger variable log.exception instead, preserving the same message
and error=str(e) payload and then re-raising the same exceptions
(EmbeddingError, VDBError, and raising UnexpectedVDBError with the same args) so
the logs retain the partition/file_id context (references: the bound logger
variable log, exception types EmbeddingError, VDBError, UnexpectedVDBError).
---
Nitpick comments:
In `@openrag/components/indexer/vectordb/vectordb.py`:
- Around line 692-719: In get_file_chunk_ids, bind logger context at the start
(e.g. log = self.logger.bind(file_id=file_id, partition=partition)) and use that
bound logger for exception logging; keep the existing MilvusException block but
change self.logger.exception(...) to log.exception(...). Also add a catch-all
except Exception as e block after the MilvusException handler that logs via
log.exception("Failed to get file chunk IDs", error=str(e)) and re-raises a
VDBSearchError with the same payload style (message, collection_name, partition,
file_id) as in get_file_chunks/delete_file_chunks.
- Around line 675-690: The delete_chunks_by_ids method only catches
MilvusException and should also catch any other Exception to match the file's
error-handling pattern; update delete_chunks_by_ids to add a broad except
Exception as e block after the existing MilvusException handler that logs via
self.logger.exception (including the error) and raises a VDBDeleteError with the
same structure used elsewhere (include f"Failed to delete old chunks by ID:
{e!s}" and collection_name=self.collection_name) so unexpected errors are
wrapped consistently.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9d9f4c4b-c736-49b2-8c4e-eaaa9b390e77
📒 Files selected for processing (1)
openrag/components/indexer/vectordb/vectordb.py
Exception handlers now use the bound 'log' variable instead of 'self.logger' to preserve partition/file_id/filename context in error logs. The log variable is initialized to self.logger as fallback before the try block, then rebound with context after metadata extraction succeeds.
- get_file_chunk_ids: add logger binding and catch-all Exception handler - delete_chunks_by_ids: add catch-all Exception handler These changes make the error handling consistent with other methods in this class (delete_file, delete_file_chunks, get_file_chunks).
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@openrag/components/indexer/vectordb/vectordb.py`:
- Around line 848-856: The call to
partition_file_manager.update_file_in_partition may return False when the
Postgres row was removed concurrently, but the current code ignores this and
proceeds after inserting new Milvus chunks and deleting old ones; modify the
code around the call to update_file_in_partition (the block that passes file_id,
partition, file_metadata, relationship_id, parent_id) to check its boolean
return and, on False, roll back or remediate the Milvus changes (e.g., delete
the newly inserted chunks or re-insert the old ones) and raise/log an error to
avoid leaving orphaned Milvus vectors; ensure you reference
update_file_in_partition and file_id/partition in the error/rollback path so
it's clear which file failed to update.
- Around line 776-778: The call to
partition_file_manager.update_file_metadata_in_db(file_id, partition,
file_metadata) can return False when the PG row is missing; update the caller
(the code performing the Milvus upsert around lines where the Milvus upsert
occurs) to check that return value and treat it as a failure: if
update_file_metadata_in_db returns False, either rollback the Milvus upsert
(invoke the corresponding Milvus delete/remove for the inserted vectors) or
raise/log an error and abort the operation so you don't leave Milvus
in-sync-only state; ensure you reference
partition_file_manager.update_file_metadata_in_db and the surrounding Milvus
upsert routine when implementing the check and failure handling.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a4f80e02-882a-47de-b854-0899da345c05
📒 Files selected for processing (1)
openrag/components/indexer/vectordb/vectordb.py
Log warnings when update_file_metadata_in_db or update_file_in_partition return False, indicating the PG row was concurrently deleted. This handles the edge case where Milvus operations succeed but the PG row no longer exists.
Summary
Closes #286.
Filerow, so workspace FK references survive without snapshot/restoreMotivation
The previous delete-then-insert pattern caused:
Changes
openrag/components/indexer/vectordb/utils.pyupdate_file_metadata_in_db()— new:UPDATE files SET file_metadata = ...in-placeupdate_file_in_partition()— new: updates metadata + relationship_id + parent_id in-place (for PUT)openrag/components/indexer/vectordb/vectordb.pyget_file_chunks()— addedinclude_vectorsparam; passesoutput_fields=["*", "vector"]to Milvus when Truedelete_file_chunks()— new: deletes Milvus vectors only, PG row untouchedupsert_file_metadata()— new: fetches chunks with_id+ vector, merges metadata, upserts to Milvus, updates PGadd_documents_for_existing_file()— new: embed + insert chunks for a file that already exists in PG (no duplicate check, updates PG row)openrag/components/indexer/indexer.pyupdate_file_metadata()— rewritten: single call tovectordb.upsert_file_metadata(), no delete/restoreadd_file()— addedreplace: boolparam; when True, usesreplace_file_documentsand skips workspace associationreplace_file_documents()— new: routes toadd_documents_for_existing_fileopenrag/routers/indexer.pydelete_file→delete_file_chunks(Milvus-only);add_file(replace=True)instead of workspace snapshot/restoreSummary by CodeRabbit
New Features
Bug Fixes