Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions openrag/components/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async def add_file(
partition: str | None = None,
user: dict | None = None,
workspace_ids: list[str] | None = None,
replace: bool = False,
):
task_state_manager = ray.get_actor("TaskStateManager", namespace="openrag")
task_id = ray.get_runtime_context().get_task_id()
Expand Down Expand Up @@ -110,7 +111,12 @@ async def add_file(
if self.enable_insertion:
if chunks:
await task_state_manager.set_state.remote(task_id, "INSERTING")
await self.handle.insert_documents.remote(chunks, user=user)
if replace:
# PUT flow: PG File row already exists; insert new Milvus chunks
# and update PG metadata in-place (no File row creation).
await self.handle.replace_file_documents.remote(chunks, user=user)
else:
await self.handle.insert_documents.remote(chunks, user=user)
log.info(f"Document {path} indexed successfully")
else:
log.debug("No chunks to insert !!! Potentially the uploaded file is empty")
Expand All @@ -121,8 +127,9 @@ async def add_file(
# record exists in the DB before we reference it from workspace_files.
await task_state_manager.set_state.remote(task_id, "COMPLETED")

# Associate with workspaces only after successful indexing (best-effort)
if workspace_ids:
# Associate with workspaces only after successful indexing (best-effort).
# Not needed for replace=True since the PG row (and its workspace FKs) is preserved.
if workspace_ids and not replace:
vectordb = ray.get_actor("Vectordb", namespace="openrag")
try:
await asyncio.gather(
Expand Down Expand Up @@ -160,6 +167,16 @@ async def insert_documents(self, chunks, user):
vectordb = ray.get_actor("Vectordb", namespace="openrag")
await vectordb.async_add_documents.remote(chunks, user)

@ray.method(concurrency_group="insert")
async def replace_file_documents(self, chunks, user):
"""Insert chunks for an existing file after its old Milvus chunks have been deleted.

Unlike insert_documents, this calls add_documents_for_existing_file which
updates the PostgreSQL File row in-place instead of creating a new one.
"""
vectordb = ray.get_actor("Vectordb", namespace="openrag")
await vectordb.add_documents_for_existing_file.remote(chunks, user)

@ray.method(concurrency_group="delete")
async def delete_file(self, file_id: str, partition: str) -> bool:
log = self.logger.bind(file_id=file_id, partition=partition)
Expand Down Expand Up @@ -191,30 +208,10 @@ async def update_file_metadata(
return

try:
docs = await vectordb.get_file_chunks.remote(file_id, partition)
for doc in docs:
doc.metadata.update(metadata)

# Snapshot workspace memberships before deletion so they can be restored.
workspace_ids = await vectordb.get_file_workspaces.remote(file_id, partition)

await self.delete_file(file_id, partition)
await vectordb.async_add_documents.remote(docs, user=user)

# Restore workspace memberships that existed before the delete.
if workspace_ids:
try:
await asyncio.gather(
*[vectordb.add_files_to_workspace.remote(ws_id, [file_id]) for ws_id in workspace_ids]
)
log.debug("Restored workspace memberships after metadata update.", workspace_ids=workspace_ids)
except Exception as ws_err:
log.warning(
"Failed to restore workspace memberships; file is indexed but workspace links may be incomplete.",
error=str(ws_err),
workspace_ids=workspace_ids,
)

# Upsert metadata in-place: updates Milvus chunks (preserving vectors,
# no re-embedding) and the PostgreSQL file record. No delete step, so
# workspace FK references and file_count are never disturbed.
await vectordb.upsert_file_metadata.remote(file_id, partition, metadata)
log.info("Metadata updated for file.")
except Exception as e:
log.error("Error in update_file_metadata", error=str(e))
Expand Down
57 changes: 57 additions & 0 deletions openrag/components/indexer/vectordb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,63 @@ def remove_file_from_partition(self, file_id: str, partition: str):
log.error(f"Error removing file: {e}")
raise e

def update_file_metadata_in_db(self, file_id: str, partition: str, file_metadata: dict) -> bool:
"""Update the file_metadata JSON column for an existing file in-place.

Returns True if the file was found and updated, False otherwise.
Unlike remove_file_from_partition + add_file_to_partition, this preserves
the files.id primary key so that workspace FK references remain valid.
"""
log = self.logger.bind(file_id=file_id, partition=partition)
with self.Session() as session:
try:
file = session.query(File).filter(File.file_id == file_id, File.partition_name == partition).first()
if not file:
log.warning("File not found for metadata update")
return False
file.file_metadata = file_metadata
session.commit()
log.info("Updated file metadata in-place")
return True
except Exception:
session.rollback()
log.exception("Error updating file metadata")
raise

def update_file_in_partition(
self,
file_id: str,
partition: str,
file_metadata: dict | None = None,
relationship_id: str | None = None,
parent_id: str | None = None,
) -> bool:
"""Update an existing file record in-place (for PUT: new content, same file_id).

Preserves files.id so workspace FK references stay intact.
Unlike delete+re-add, this never touches file_count or created_by.
"""
log = self.logger.bind(file_id=file_id, partition=partition)
with self.Session() as session:
try:
file = session.query(File).filter(File.file_id == file_id, File.partition_name == partition).first()
if not file:
log.warning("File not found for update")
return False
if file_metadata is not None:
file.file_metadata = file_metadata
if relationship_id is not None:
file.relationship_id = relationship_id
if parent_id is not None:
file.parent_id = parent_id
session.commit()
log.info("Updated file record in-place")
return True
except Exception:
session.rollback()
log.exception("Error updating file in partition")
raise

def delete_partition(self, partition: str):
"""Delete a partition and all its files"""
with self.Session() as session:
Expand Down
162 changes: 160 additions & 2 deletions openrag/components/indexer/vectordb/vectordb.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,156 @@ async def delete_file(self, file_id: str, partition: str):
file_id=file_id,
)

async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = False, limit: int = 100):
async def delete_file_chunks(self, file_id: str, partition: str):
"""Delete file chunks from Milvus only — does NOT touch PostgreSQL or workspace associations.

Used by PUT (file replace) to remove old Milvus vectors while keeping the
PostgreSQL File row intact, preserving workspace FK references.
"""
log = self.logger.bind(file_id=file_id, partition=partition)
try:
res = await self._async_client.delete(
collection_name=self.collection_name,
filter=f"partition == '{partition}' and file_id == '{file_id}'",
)
log.info("Deleted file chunks from Milvus (PG row preserved).", count=res.get("delete_count", 0))
except MilvusException as e:
log.exception(f"Couldn't delete file chunks for file_id {file_id}", error=str(e))
raise VDBDeleteError(
f"Couldn't delete file chunks for file_id {file_id}: {e!s}",
collection_name=self.collection_name,
partition=partition,
file_id=file_id,
)

async def upsert_file_metadata(self, file_id: str, partition: str, metadata: dict):
"""Update metadata on all chunks of a file in-place via Milvus upsert.

Fetches existing chunks (with _id and vectors), merges new metadata,
then upserts back into Milvus. No re-embedding is performed.
Also updates the PostgreSQL file record metadata in-place.
"""
log = self.logger.bind(file_id=file_id, partition=partition)
try:
# Fetch all chunks with their _id and vector so we can upsert without re-embedding.
docs = await self.get_file_chunks(file_id, partition, include_id=True, include_vectors=True)
if not docs:
log.warning("No chunks found for metadata upsert")
return

entities = []
for doc in docs:
chunk_metadata = dict(doc.metadata)
milvus_id = chunk_metadata.pop("_id")
vector = chunk_metadata.pop("vector")
# Merge new metadata into the chunk metadata
chunk_metadata.update(metadata)
entities.append(
{
"_id": milvus_id,
"text": doc.page_content,
"vector": vector,
**chunk_metadata,
}
)

await self._async_client.upsert(
collection_name=self.collection_name,
data=entities,
)

# Build file-level metadata from the first chunk (same as async_add_documents)
file_metadata = dict(docs[0].metadata)
file_metadata.pop("_id", None)
file_metadata.pop("vector", None)
file_metadata.pop("page", None)
file_metadata.update(metadata)
self.partition_file_manager.update_file_metadata_in_db(file_id, partition, file_metadata)

log.info("Upserted file metadata in-place.", chunk_count=len(entities))

except MilvusException as e:
log.exception("Milvus upsert failed", error=str(e))
raise VDBInsertError(
f"Couldn't upsert metadata for file {file_id}: {e!s}",
collection_name=self.collection_name,
partition=partition,
file_id=file_id,
)
except VDBError:
raise
except Exception as e:
log.exception("Unexpected error during metadata upsert", error=str(e))
raise UnexpectedVDBError(
f"Unexpected error during metadata upsert for {file_id}: {e!s}",
collection_name=self.collection_name,
)

async def add_documents_for_existing_file(self, chunks: list[Document], user: dict) -> None:
"""Insert new chunks into Milvus for a file that already exists in PostgreSQL.

Used by PUT (file replace): the PostgreSQL File row is preserved and updated
in-place, so this method only handles Milvus insertion + PG metadata update.
Unlike async_add_documents, it does NOT create a new File row or check for duplicates.
"""
try:
file_metadata = dict(chunks[0].metadata)
file_metadata.pop("page")
file_id, partition = file_metadata.get("file_id"), file_metadata.get("partition")
relationship_id = file_metadata.get("relationship_id")
parent_id = file_metadata.get("parent_id")

self.logger.bind(partition=partition, file_id=file_id, filename=file_metadata.get("filename"))

entities = []
vectors = await self.embedder.aembed_documents(chunks)
order_metadata_l: list[dict] = _gen_chunk_order_metadata(n=len(chunks))
for chunk, vector, order_metadata in zip(chunks, vectors, order_metadata_l):
entities.append(
{
"text": chunk.page_content,
"vector": vector,
**order_metadata,
**chunk.metadata,
}
)

await self._async_client.insert(
collection_name=self.collection_name,
data=entities,
)

# Update existing PostgreSQL file record in-place (preserves files.id PK)
self.partition_file_manager.update_file_in_partition(
file_id=file_id,
partition=partition,
file_metadata=file_metadata,
relationship_id=relationship_id,
parent_id=parent_id,
)
self.logger.info(f"File '{file_id}' chunks replaced in partition '{partition}'")

except EmbeddingError as e:
self.logger.exception("Embedding failed", error=str(e))
raise
except VDBError as e:
self.logger.exception("VectorDB operation failed", error=str(e))
raise
except Exception as e:
self.logger.exception("Unexpected error while adding chunks for existing file", error=str(e))
raise UnexpectedVDBError(
f"Unexpected error while adding chunks for existing file: {e!s}",
collection_name=self.collection_name,
)

async def get_file_chunks(
self,
file_id: str,
partition: str,
include_id: bool = False,
include_vectors: bool = False,
limit: int = 100,
):
log = self.logger.bind(file_id=file_id, partition=partition)
try:
self._check_file_exists(file_id, partition)
Expand All @@ -648,7 +797,15 @@ async def get_file_chunks(self, file_id: str, partition: str, include_id: bool =
# Pagination parameters
offset = 0
results = []
excluded_keys = ["text", "vector", "_id"] if not include_id else ["text", "vector"]
excluded_keys = {"text"}
if not include_id:
excluded_keys.add("_id")
if not include_vectors:
excluded_keys.add("vector")

# Milvus query with output_fields=["*"] returns all scalar fields
# but excludes vector fields. To include vectors, request them explicitly.
output_fields = ["*", "vector"] if include_vectors else ["*"]

while True:
response = await self._async_client.query(
Expand All @@ -657,6 +814,7 @@ async def get_file_chunks(self, file_id: str, partition: str, include_id: bool =
filter_params=filter_params,
limit=limit,
offset=offset,
output_fields=output_fields,
)

if not response:
Expand Down
17 changes: 8 additions & 9 deletions openrag/routers/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,14 @@ async def put_file(
detail=f"'{file_id}' not found in partition '{partition}'",
)

# Snapshot workspace memberships before deletion so they can be restored on the new version.
existing_workspace_ids = await call_ray_actor_with_timeout(
future=vectordb.get_file_workspaces.remote(file_id, partition),
# Delete only the Milvus chunks — the PostgreSQL File row (and its workspace
# FK references) is preserved so workspace memberships survive the replace.
await call_ray_actor_with_timeout(
future=vectordb.delete_file_chunks.remote(file_id, partition),
timeout=VECTORDB_TIMEOUT,
task_description=f"get_file_workspaces({file_id}, {partition})",
task_description=f"delete_file_chunks({file_id}, {partition})",
)

# Delete the existing file from the vector database
await indexer.delete_file.remote(file_id, partition)

save_dir = Path(DATA_DIR)
original_filename = file.filename
file.filename = sanitize_filename(file.filename)
Expand All @@ -302,13 +300,14 @@ async def put_file(
metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat()
metadata["file_id"] = file_id

# Indexing the file — restore pre-existing workspace memberships on the new version.
# Re-index: serialize → chunk → embed → insert into Milvus + update PG row in-place.
# replace=True tells add_file to update the existing PG File row rather than creating a new one.
task = indexer.add_file.remote(
path=file_path,
metadata=metadata,
partition=partition,
user=user,
workspace_ids=existing_workspace_ids or None,
replace=True,
)
await task_state_manager.set_state.remote(task.task_id().hex(), "QUEUED")
await task_state_manager.set_object_ref.remote(task.task_id().hex(), {"ref": task})
Expand Down
Loading