Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions openrag/components/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async def add_file(
partition: str | None = None,
user: dict | None = None,
workspace_ids: list[str] | None = None,
replace: bool = False,
):
task_state_manager = ray.get_actor("TaskStateManager", namespace="openrag")
task_id = ray.get_runtime_context().get_task_id()
Expand Down Expand Up @@ -110,7 +111,12 @@ async def add_file(
if self.enable_insertion:
if chunks:
await task_state_manager.set_state.remote(task_id, "INSERTING")
await self.handle.insert_documents.remote(chunks, user=user)
if replace:
# PUT flow: PG File row already exists; insert new Milvus chunks
# and update PG metadata in-place (no File row creation).
await self.handle.replace_file_documents.remote(chunks, user=user)
else:
await self.handle.insert_documents.remote(chunks, user=user)
log.info(f"Document {path} indexed successfully")
else:
log.debug("No chunks to insert !!! Potentially the uploaded file is empty")
Expand All @@ -121,8 +127,9 @@ async def add_file(
# record exists in the DB before we reference it from workspace_files.
await task_state_manager.set_state.remote(task_id, "COMPLETED")

# Associate with workspaces only after successful indexing (best-effort)
if workspace_ids:
# Associate with workspaces only after successful indexing (best-effort).
# Not needed for replace=True since the PG row (and its workspace FKs) is preserved.
if workspace_ids and not replace:
vectordb = ray.get_actor("Vectordb", namespace="openrag")
try:
await asyncio.gather(
Expand Down Expand Up @@ -160,6 +167,16 @@ async def insert_documents(self, chunks, user):
vectordb = ray.get_actor("Vectordb", namespace="openrag")
await vectordb.async_add_documents.remote(chunks, user)

@ray.method(concurrency_group="insert")
async def replace_file_documents(self, chunks, user):
"""Insert chunks for an existing file after its old Milvus chunks have been deleted.

Unlike insert_documents, this calls add_documents_for_existing_file which
updates the PostgreSQL File row in-place instead of creating a new one.
"""
vectordb = ray.get_actor("Vectordb", namespace="openrag")
await vectordb.add_documents_for_existing_file.remote(chunks, user)

@ray.method(concurrency_group="delete")
async def delete_file(self, file_id: str, partition: str) -> bool:
log = self.logger.bind(file_id=file_id, partition=partition)
Expand Down Expand Up @@ -191,30 +208,10 @@ async def update_file_metadata(
return

try:
docs = await vectordb.get_file_chunks.remote(file_id, partition)
for doc in docs:
doc.metadata.update(metadata)

# Snapshot workspace memberships before deletion so they can be restored.
workspace_ids = await vectordb.get_file_workspaces.remote(file_id, partition)

await self.delete_file(file_id, partition)
await vectordb.async_add_documents.remote(docs, user=user)

# Restore workspace memberships that existed before the delete.
if workspace_ids:
try:
await asyncio.gather(
*[vectordb.add_files_to_workspace.remote(ws_id, [file_id]) for ws_id in workspace_ids]
)
log.debug("Restored workspace memberships after metadata update.", workspace_ids=workspace_ids)
except Exception as ws_err:
log.warning(
"Failed to restore workspace memberships; file is indexed but workspace links may be incomplete.",
error=str(ws_err),
workspace_ids=workspace_ids,
)

# Upsert metadata in-place: updates Milvus chunks (preserving vectors,
# no re-embedding) and the PostgreSQL file record. No delete step, so
# workspace FK references and file_count are never disturbed.
await vectordb.upsert_file_metadata.remote(file_id, partition, metadata)
log.info("Metadata updated for file.")
except Exception as e:
log.error("Error in update_file_metadata", error=str(e))
Expand Down
73 changes: 73 additions & 0 deletions openrag/components/indexer/vectordb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,79 @@ def remove_file_from_partition(self, file_id: str, partition: str):
log.error(f"Error removing file: {e}")
raise e

def update_file_metadata_in_db(self, file_id: str, partition: str, file_metadata: dict) -> bool:
"""Update the file_metadata JSON column and structured fields for an existing file.

Returns True if the file was found and updated, False otherwise.
Unlike remove_file_from_partition + add_file_to_partition, this preserves
the files.id primary key so that workspace FK references remain valid.

If file_metadata contains keys that correspond to structured File columns
(relationship_id, parent_id), those columns are updated too so they stay
in sync with the JSON blob.
"""
log = self.logger.bind(file_id=file_id, partition=partition)
with self.Session() as session:
try:
file = session.query(File).filter(File.file_id == file_id, File.partition_name == partition).first()
if not file:
log.warning("File not found for metadata update")
return False
file.file_metadata = file_metadata
# Sync structured columns when the corresponding keys are present
# in the metadata payload, so PG columns never diverge from the JSON.
if "relationship_id" in file_metadata:
file.relationship_id = file_metadata["relationship_id"]
if "parent_id" in file_metadata:
file.parent_id = file_metadata["parent_id"]
session.commit()
log.info("Updated file metadata in-place")
return True
except Exception:
session.rollback()
log.exception("Error updating file metadata")
raise

# Sentinel object to distinguish "not provided" from explicit None.
_UNSET = object()

def update_file_in_partition(
self,
file_id: str,
partition: str,
file_metadata: dict | None = None,
relationship_id: str | None | object = _UNSET,
parent_id: str | None | object = _UNSET,
) -> bool:
"""Update an existing file record in-place (for PUT: new content, same file_id).

Preserves files.id so workspace FK references stay intact.
Unlike delete+re-add, this never touches file_count or created_by.

Pass relationship_id=None or parent_id=None explicitly to clear a stale
link. Omit the argument entirely to leave the column unchanged.
"""
log = self.logger.bind(file_id=file_id, partition=partition)
with self.Session() as session:
try:
file = session.query(File).filter(File.file_id == file_id, File.partition_name == partition).first()
if not file:
log.warning("File not found for update")
return False
if file_metadata is not None:
file.file_metadata = file_metadata
if relationship_id is not self._UNSET:
file.relationship_id = relationship_id
if parent_id is not self._UNSET:
file.parent_id = parent_id
session.commit()
log.info("Updated file record in-place")
return True
except Exception:
session.rollback()
log.exception("Error updating file in partition")
raise

def delete_partition(self, partition: str):
"""Delete a partition and all its files"""
with self.Session() as session:
Expand Down
Loading