diff --git a/openrag/components/indexer/indexer.py b/openrag/components/indexer/indexer.py index 18fd67e4..4064e1d7 100644 --- a/openrag/components/indexer/indexer.py +++ b/openrag/components/indexer/indexer.py @@ -72,6 +72,7 @@ async def add_file( partition: str | None = None, user: dict | None = None, workspace_ids: list[str] | None = None, + replace: bool = False, ): task_state_manager = ray.get_actor("TaskStateManager", namespace="openrag") task_id = ray.get_runtime_context().get_task_id() @@ -110,7 +111,12 @@ async def add_file( if self.enable_insertion: if chunks: await task_state_manager.set_state.remote(task_id, "INSERTING") - await self.handle.insert_documents.remote(chunks, user=user) + if replace: + # PUT flow: PG File row already exists; insert new Milvus chunks + # and update PG metadata in-place (no File row creation). + await self.handle.replace_file_documents.remote(chunks, user=user) + else: + await self.handle.insert_documents.remote(chunks, user=user) log.info(f"Document {path} indexed successfully") else: log.debug("No chunks to insert !!! Potentially the uploaded file is empty") @@ -121,8 +127,9 @@ async def add_file( # record exists in the DB before we reference it from workspace_files. await task_state_manager.set_state.remote(task_id, "COMPLETED") - # Associate with workspaces only after successful indexing (best-effort) - if workspace_ids: + # Associate with workspaces only after successful indexing (best-effort). + # Not needed for replace=True since the PG row (and its workspace FKs) is preserved. + if workspace_ids and not replace: vectordb = ray.get_actor("Vectordb", namespace="openrag") try: await asyncio.gather( @@ -160,6 +167,16 @@ async def insert_documents(self, chunks, user): vectordb = ray.get_actor("Vectordb", namespace="openrag") await vectordb.async_add_documents.remote(chunks, user) + @ray.method(concurrency_group="insert") + async def replace_file_documents(self, chunks, user): + """Insert chunks for an existing file after its old Milvus chunks have been deleted. + + Unlike insert_documents, this calls add_documents_for_existing_file which + updates the PostgreSQL File row in-place instead of creating a new one. + """ + vectordb = ray.get_actor("Vectordb", namespace="openrag") + await vectordb.add_documents_for_existing_file.remote(chunks, user) + @ray.method(concurrency_group="delete") async def delete_file(self, file_id: str, partition: str) -> bool: log = self.logger.bind(file_id=file_id, partition=partition) @@ -191,30 +208,10 @@ async def update_file_metadata( return try: - docs = await vectordb.get_file_chunks.remote(file_id, partition) - for doc in docs: - doc.metadata.update(metadata) - - # Snapshot workspace memberships before deletion so they can be restored. - workspace_ids = await vectordb.get_file_workspaces.remote(file_id, partition) - - await self.delete_file(file_id, partition) - await vectordb.async_add_documents.remote(docs, user=user) - - # Restore workspace memberships that existed before the delete. - if workspace_ids: - try: - await asyncio.gather( - *[vectordb.add_files_to_workspace.remote(ws_id, [file_id]) for ws_id in workspace_ids] - ) - log.debug("Restored workspace memberships after metadata update.", workspace_ids=workspace_ids) - except Exception as ws_err: - log.warning( - "Failed to restore workspace memberships; file is indexed but workspace links may be incomplete.", - error=str(ws_err), - workspace_ids=workspace_ids, - ) - + # Upsert metadata in-place: updates Milvus chunks (preserving vectors, + # no re-embedding) and the PostgreSQL file record. No delete step, so + # workspace FK references and file_count are never disturbed. + await vectordb.upsert_file_metadata.remote(file_id, partition, metadata) log.info("Metadata updated for file.") except Exception as e: log.error("Error in update_file_metadata", error=str(e)) diff --git a/openrag/components/indexer/vectordb/utils.py b/openrag/components/indexer/vectordb/utils.py index 5bc582e0..0b188fbd 100644 --- a/openrag/components/indexer/vectordb/utils.py +++ b/openrag/components/indexer/vectordb/utils.py @@ -175,6 +175,79 @@ def remove_file_from_partition(self, file_id: str, partition: str): log.error(f"Error removing file: {e}") raise e + def update_file_metadata_in_db(self, file_id: str, partition: str, file_metadata: dict) -> bool: + """Update the file_metadata JSON column and structured fields for an existing file. + + Returns True if the file was found and updated, False otherwise. + Unlike remove_file_from_partition + add_file_to_partition, this preserves + the files.id primary key so that workspace FK references remain valid. + + If file_metadata contains keys that correspond to structured File columns + (relationship_id, parent_id), those columns are updated too so they stay + in sync with the JSON blob. + """ + log = self.logger.bind(file_id=file_id, partition=partition) + with self.Session() as session: + try: + file = session.query(File).filter(File.file_id == file_id, File.partition_name == partition).first() + if not file: + log.warning("File not found for metadata update") + return False + file.file_metadata = file_metadata + # Sync structured columns when the corresponding keys are present + # in the metadata payload, so PG columns never diverge from the JSON. + if "relationship_id" in file_metadata: + file.relationship_id = file_metadata["relationship_id"] + if "parent_id" in file_metadata: + file.parent_id = file_metadata["parent_id"] + session.commit() + log.info("Updated file metadata in-place") + return True + except Exception: + session.rollback() + log.exception("Error updating file metadata") + raise + + # Sentinel object to distinguish "not provided" from explicit None. + _UNSET = object() + + def update_file_in_partition( + self, + file_id: str, + partition: str, + file_metadata: dict | None = None, + relationship_id: str | None | object = _UNSET, + parent_id: str | None | object = _UNSET, + ) -> bool: + """Update an existing file record in-place (for PUT: new content, same file_id). + + Preserves files.id so workspace FK references stay intact. + Unlike delete+re-add, this never touches file_count or created_by. + + Pass relationship_id=None or parent_id=None explicitly to clear a stale + link. Omit the argument entirely to leave the column unchanged. + """ + log = self.logger.bind(file_id=file_id, partition=partition) + with self.Session() as session: + try: + file = session.query(File).filter(File.file_id == file_id, File.partition_name == partition).first() + if not file: + log.warning("File not found for update") + return False + if file_metadata is not None: + file.file_metadata = file_metadata + if relationship_id is not self._UNSET: + file.relationship_id = relationship_id + if parent_id is not self._UNSET: + file.parent_id = parent_id + session.commit() + log.info("Updated file record in-place") + return True + except Exception: + session.rollback() + log.exception("Error updating file in partition") + raise + def delete_partition(self, partition: str): """Delete a partition and all its files""" with self.Session() as session: diff --git a/openrag/components/indexer/vectordb/vectordb.py b/openrag/components/indexer/vectordb/vectordb.py index b060826a..5756cc07 100644 --- a/openrag/components/indexer/vectordb/vectordb.py +++ b/openrag/components/indexer/vectordb/vectordb.py @@ -637,7 +637,253 @@ async def delete_file(self, file_id: str, partition: str): file_id=file_id, ) - async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = False, limit: int = 100): + async def delete_file_chunks(self, file_id: str, partition: str): + """Delete file chunks from Milvus only — does NOT touch PostgreSQL or workspace associations. + + Used by PUT (file replace) to remove old Milvus vectors while keeping the + PostgreSQL File row intact, preserving workspace FK references. + """ + log = self.logger.bind(file_id=file_id, partition=partition) + try: + # Use parameterized filter to avoid expression injection via + # crafted file_id or partition values. + res = await self._async_client.delete( + collection_name=self.collection_name, + filter="partition == {partition} and file_id == {file_id}", + filter_params={"partition": partition, "file_id": file_id}, + ) + log.info("Deleted file chunks from Milvus (PG row preserved).", count=res.get("delete_count", 0)) + except MilvusException as e: + log.exception(f"Couldn't delete file chunks for file_id {file_id}", error=str(e)) + raise VDBDeleteError( + f"Couldn't delete file chunks for file_id {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + ) + except VDBError: + raise + except Exception as e: + log.exception("Unexpected error while deleting file chunks", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while deleting file chunks {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + ) + + async def delete_chunks_by_ids(self, chunk_ids: list[int]): + """Delete specific Milvus chunks by their _id primary keys.""" + if not chunk_ids: + return + try: + await self._async_client.delete( + collection_name=self.collection_name, + ids=chunk_ids, + ) + self.logger.info("Deleted old chunks by ID.", count=len(chunk_ids)) + except MilvusException as e: + self.logger.exception("Failed to delete old chunks by ID", error=str(e)) + raise VDBDeleteError( + f"Failed to delete old chunks by ID: {e!s}", + collection_name=self.collection_name, + ) + except Exception as e: + self.logger.exception("Unexpected error while deleting chunks by ID", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while deleting chunks by ID: {e!s}", + collection_name=self.collection_name, + ) + + async def get_file_chunk_ids(self, file_id: str, partition: str) -> list[int]: + """Return the Milvus _id values for all chunks of a file.""" + log = self.logger.bind(file_id=file_id, partition=partition) + try: + results = [] + offset = 0 + limit = 100 + while True: + response = await self._async_client.query( + collection_name=self.collection_name, + filter="partition == {partition} and file_id == {file_id}", + filter_params={"partition": partition, "file_id": file_id}, + output_fields=["_id"], + limit=limit, + offset=offset, + ) + if not response: + break + results.extend(r["_id"] for r in response) + offset += len(response) + return results + except MilvusException as e: + log.exception("Failed to get file chunk IDs", error=str(e)) + raise VDBSearchError( + f"Failed to get file chunk IDs for {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + ) + except Exception as e: + log.exception("Unexpected error while getting file chunk IDs", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while getting file chunk IDs for {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + ) + + async def upsert_file_metadata(self, file_id: str, partition: str, metadata: dict): + """Update metadata on all chunks of a file in-place via Milvus upsert. + + Fetches existing chunks (with _id and vectors), merges new metadata, + then upserts back into Milvus. No re-embedding is performed. + Also updates the PostgreSQL file record metadata in-place. + """ + log = self.logger.bind(file_id=file_id, partition=partition) + try: + # Fetch all chunks with their _id and vector so we can upsert without re-embedding. + docs = await self.get_file_chunks(file_id, partition, include_id=True, include_vectors=True) + if not docs: + log.warning("No chunks found for metadata upsert") + return + + entities = [] + for doc in docs: + chunk_metadata = dict(doc.metadata) + milvus_id = chunk_metadata.pop("_id") + vector = chunk_metadata.pop("vector") + # Merge new metadata into the chunk metadata + chunk_metadata.update(metadata) + entities.append( + { + "_id": milvus_id, + "text": doc.page_content, + "vector": vector, + **chunk_metadata, + } + ) + + await self._async_client.upsert( + collection_name=self.collection_name, + data=entities, + ) + + # Build file-level metadata from the first chunk (same as async_add_documents). + # Strip per-chunk fields that don't belong in the file-level PG record. + file_metadata = dict(docs[0].metadata) + for key in ("_id", "vector", "page", "section_id", "prev_section_id", "next_section_id"): + file_metadata.pop(key, None) + file_metadata.update(metadata) + if not self.partition_file_manager.update_file_metadata_in_db(file_id, partition, file_metadata): + # PG row was concurrently deleted; Milvus upsert already succeeded. + # Log warning but don't fail — Milvus data will be orphaned until + # next cleanup, but the user-facing operation should still succeed. + log.warning("PG file row not found during metadata upsert; Milvus updated but PG skipped") + + log.info("Upserted file metadata in-place.", chunk_count=len(entities)) + + except MilvusException as e: + log.exception("Milvus upsert failed", error=str(e)) + raise VDBInsertError( + f"Couldn't upsert metadata for file {file_id}: {e!s}", + collection_name=self.collection_name, + partition=partition, + file_id=file_id, + ) + except VDBError: + raise + except Exception as e: + log.exception("Unexpected error during metadata upsert", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error during metadata upsert for {file_id}: {e!s}", + collection_name=self.collection_name, + ) + + async def add_documents_for_existing_file(self, chunks: list[Document], user: dict) -> None: + """Replace Milvus chunks for a file that already exists in PostgreSQL. + + Used by PUT (file replace). The flow is insert-before-delete so the file + is never left in a half-replaced state: + 1. Snapshot old chunk _id values + 2. Embed and insert new chunks + 3. Delete old chunks by _id + 4. Update the PostgreSQL File row in-place + + If step 2 fails, old chunks remain intact. If step 3 fails, we have + duplicates temporarily but no data loss — a retry or manual cleanup + can resolve it. + """ + log = self.logger # Fallback; rebound with context below + try: + file_metadata = dict(chunks[0].metadata) + file_metadata.pop("page") + file_id, partition = file_metadata.get("file_id"), file_metadata.get("partition") + relationship_id = file_metadata.get("relationship_id") + parent_id = file_metadata.get("parent_id") + + log = self.logger.bind(partition=partition, file_id=file_id, filename=file_metadata.get("filename")) + + # 1. Snapshot old chunk _id values before inserting new ones. + old_chunk_ids = await self.get_file_chunk_ids(file_id, partition) + + # 2. Embed and insert new chunks. + entities = [] + vectors = await self.embedder.aembed_documents(chunks) + order_metadata_l: list[dict] = _gen_chunk_order_metadata(n=len(chunks)) + for chunk, vector, order_metadata in zip(chunks, vectors, order_metadata_l): + entities.append( + { + "text": chunk.page_content, + "vector": vector, + **order_metadata, + **chunk.metadata, + } + ) + + await self._async_client.insert( + collection_name=self.collection_name, + data=entities, + ) + + # 3. Delete old chunks by _id (new ones are already durable). + await self.delete_chunks_by_ids(old_chunk_ids) + + # 4. Update existing PostgreSQL file record in-place (preserves files.id PK) + if not self.partition_file_manager.update_file_in_partition( + file_id=file_id, + partition=partition, + file_metadata=file_metadata, + relationship_id=relationship_id, + parent_id=parent_id, + ): + # PG row was concurrently deleted after we inserted new Milvus chunks. + # Log warning — Milvus has new orphaned chunks but data is consistent + # (old chunks deleted, new chunks inserted, no PG record). + log.warning("PG file row not found during replace; Milvus updated but PG skipped") + log.info(f"File '{file_id}' chunks replaced in partition '{partition}'") + + except EmbeddingError as e: + log.exception("Embedding failed", error=str(e)) + raise + except VDBError as e: + log.exception("VectorDB operation failed", error=str(e)) + raise + except Exception as e: + log.exception("Unexpected error while adding chunks for existing file", error=str(e)) + raise UnexpectedVDBError( + f"Unexpected error while adding chunks for existing file: {e!s}", + collection_name=self.collection_name, + ) + + async def get_file_chunks( + self, + file_id: str, + partition: str, + include_id: bool = False, + include_vectors: bool = False, + limit: int = 100, + ): log = self.logger.bind(file_id=file_id, partition=partition) try: self._check_file_exists(file_id, partition) @@ -648,7 +894,15 @@ async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = # Pagination parameters offset = 0 results = [] - excluded_keys = ["text", "vector", "_id"] if not include_id else ["text", "vector"] + excluded_keys = {"text"} + if not include_id: + excluded_keys.add("_id") + if not include_vectors: + excluded_keys.add("vector") + + # Milvus query with output_fields=["*"] returns all scalar fields + # but excludes vector fields. To include vectors, request them explicitly. + output_fields = ["*", "vector"] if include_vectors else ["*"] while True: response = await self._async_client.query( @@ -657,6 +911,7 @@ async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = filter_params=filter_params, limit=limit, offset=offset, + output_fields=output_fields, ) if not response: diff --git a/openrag/routers/indexer.py b/openrag/routers/indexer.py index 367f34da..8d724789 100644 --- a/openrag/routers/indexer.py +++ b/openrag/routers/indexer.py @@ -272,15 +272,9 @@ async def put_file( detail=f"'{file_id}' not found in partition '{partition}'", ) - # Snapshot workspace memberships before deletion so they can be restored on the new version. - existing_workspace_ids = await call_ray_actor_with_timeout( - future=vectordb.get_file_workspaces.remote(file_id, partition), - timeout=VECTORDB_TIMEOUT, - task_description=f"get_file_workspaces({file_id}, {partition})", - ) - - # Delete the existing file from the vector database - await indexer.delete_file.remote(file_id, partition) + # No Milvus deletion here. The Indexer's add_file(replace=True) flow uses + # insert-before-delete: it snapshots old chunk IDs, inserts new chunks, + # then deletes old ones — so the file is never left in a half-replaced state. save_dir = Path(DATA_DIR) original_filename = file.filename @@ -302,13 +296,14 @@ async def put_file( metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() metadata["file_id"] = file_id - # Indexing the file — restore pre-existing workspace memberships on the new version. + # Re-index: serialize → chunk → embed → insert into Milvus + update PG row in-place. + # replace=True tells add_file to update the existing PG File row rather than creating a new one. task = indexer.add_file.remote( path=file_path, metadata=metadata, partition=partition, user=user, - workspace_ids=existing_workspace_ids or None, + replace=True, ) await task_state_manager.set_state.remote(task.task_id().hex(), "QUEUED") await task_state_manager.set_object_ref.remote(task.task_id().hex(), {"ref": task})