Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
28dc26e
feat(phase-2): centralized exception handling for API layer
paultranvan Feb 12, 2026
84bc386
feat(03-04): add typed exception handling to pipeline and map-reduce
paultranvan Feb 10, 2026
013c98c
refactor(03-03): replace exception handlers in base, image, media, pp…
paultranvan Feb 10, 2026
19a79fc
feat(03-04): add typed exception handling to reranker and LLM streaming
paultranvan Feb 10, 2026
4886c37
refactor(03-01): replace exception handlers in vectordb.py with typed…
paultranvan Feb 10, 2026
0706080
feat(03-02): replace 5 exception handlers in indexer.py with typed ex…
paultranvan Feb 10, 2026
dc083d0
refactor(03-03): replace exception handlers in eml, serializer, marke…
paultranvan Feb 10, 2026
9ecc162
feat(03-02): replace exception handlers in embeddings and chunker
paultranvan Feb 10, 2026
83574f0
refactor(03-01): replace exception handlers in utils.py with typed VD…
paultranvan Feb 10, 2026
7a43dfb
refactor(phase-3): replace RuntimeError with typed OpenRAGError subcl…
paultranvan Feb 12, 2026
f68bfb9
feat(04-async-infrastructure): convert BaseLoader.save_content to async
paultranvan Feb 11, 2026
4d31294
feat(04-async-infrastructure): convert restore script to async Ray calls
paultranvan Feb 11, 2026
2f04e94
feat(04-02): convert EmlLoader, DocxLoader, and DocLoader to non-bloc…
paultranvan Feb 11, 2026
88487be
feat(04-02): convert ImageLoader, PyMuPDF4LLMLoader, and PPTXLoader t…
paultranvan Feb 11, 2026
f535e4b
fix: restore binascii import removed by ruff auto-fix
paultranvan Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions openrag/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ async def openrag_exception_handler(request: Request, exc: OpenRAGError):
return JSONResponse(status_code=exc.status_code, content=exc.to_dict())


@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger = get_logger()
logger.exception("Unhandled exception", error=str(exc))
return JSONResponse(
status_code=500,
content={"detail": "[UNEXPECTED_ERROR]: An unexpected error occurred", "extra": {}},
)


# Add CORS middleware
allow_origins = [
"http://localhost:3042",
Expand Down
26 changes: 14 additions & 12 deletions openrag/components/indexer/chunker/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ async def _generate_context(
]
output = await self.context_generator.ainvoke(messages)
return output.content

except openai.APITimeoutError:
logger.warning(
f"OpenAI API timeout contextualizing chunk after {CONTEXTUALIZATION_TIMEOUT}s",
filename=filename,
)
# VLM timeout - graceful degradation
logger.warning("VLM context generation timeout", timeout=CONTEXTUALIZATION_TIMEOUT)
return ""
except Exception as e:
logger.warning(
"Error contextualizing chunk of document",
filename=filename,
error=str(e),
)

except openai.APIError as e:
# Other VLM API errors - log but don't fail chunking
logger.error("VLM context generation failed", error=str(e))
return ""

except Exception:
# Unexpected errors - log but still gracefully degrade
logger.exception("Unexpected error during context generation")
return ""

async def contextualize_chunks(
Expand Down Expand Up @@ -130,8 +132,8 @@ async def contextualize_chunks(
for chunk, context in zip(chunks, contexts, strict=True)
]

except Exception as e:
logger.warning(f"Error contextualizing chunks from `{filename}`: {e}")
except Exception:
logger.exception("Error contextualizing chunks", filename=filename)
return chunks


Expand Down
14 changes: 12 additions & 2 deletions openrag/components/indexer/embeddings/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@ def embedding_dimension(self) -> int:
# Test call to get embedding dimension
output = self.embed_documents([Document(page_content="test")])
return len(output[0])

except openai.APIError as e:
logger.error("Failed to get embedding dimension", error=str(e))
raise EmbeddingAPIError(f"API error: {e}", model_name=self.embedding_model)

except (IndexError, AttributeError) as e:
logger.error("Invalid embedding response format", error=str(e))
raise EmbeddingResponseError("Unexpected response format", error=str(e))

except Exception:
raise
logger.exception("Unexpected error getting embedding dimension")
raise UnexpectedEmbeddingError("An unexpected error occurred")

def embed_documents(self, texts: list[str | Document]) -> list[list[float]]:
"""
Expand Down Expand Up @@ -62,7 +72,7 @@ def embed_documents(self, texts: list[str | Document]) -> list[list[float]]:
except Exception as e:
logger.exception("Unexpected error while embedding documents", error=str(e))
raise UnexpectedEmbeddingError(
f"Failed to embed documents: {e!s}",
"An unexpected error occurred during document embedding",
model_name=self.embedding_model,
base_url=self.base_url,
error=str(e),
Expand Down
44 changes: 33 additions & 11 deletions openrag/components/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import torch
from config import load_config
from langchain_core.documents.base import Document
from utils.exceptions.base import OpenRAGError
from utils.exceptions.common import UnexpectedError

from .chunker import BaseChunker, ChunkerFactory
from .utils import serialize_file
Expand Down Expand Up @@ -119,25 +121,34 @@ async def add_file(
# Mark task as completed
await task_state_manager.set_state.remote(task_id, "COMPLETED")

except Exception as e:
log.exception(f"Task {task_id} failed in add_file")
except OpenRAGError as e:
log.error("Operation failed during file ingestion", code=e.code, error=e.message)
tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
await task_state_manager.set_state.remote(task_id, "FAILED")
await task_state_manager.set_error.remote(task_id, tb)
raise

except Exception as e:
# Truly unexpected errors
log.exception("Unexpected error during file ingestion", task_id=task_id)
tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
await task_state_manager.set_state.remote(task_id, "FAILED")
await task_state_manager.set_error.remote(task_id, tb)
raise UnexpectedError("An unexpected error occurred during file processing") from e

finally:
# GPU cleanup
if torch.cuda.is_available():
gc.collect()
torch.cuda.empty_cache()
torch.cuda.ipc_collect()

# File cleanup - nest try/except per Phase 2 decision
try:
# Cleanup input file
if not save_uploaded_files:
Path(path).unlink(missing_ok=True)
log.debug(f"Deleted input file: {path}")
except Exception as cleanup_err:
log.warning(f"Failed to delete input file {path}: {cleanup_err}")
log.warning("Failed to delete input file", path=path, error=str(cleanup_err))
return True

@ray.method(concurrency_group="insert")
Expand All @@ -157,10 +168,13 @@ async def delete_file(self, file_id: str, partition: str) -> bool:
await vectordb.delete_file.remote(file_id, partition)
log.info("Deleted file from partition.", file_id=file_id, partition=partition)

except Exception as e:
log.exception("Error in delete_file", error=str(e))
except OpenRAGError:
raise

except Exception as e:
log.exception("Unexpected error in delete_file")
raise UnexpectedError("An unexpected error occurred during file deletion") from e

@ray.method(concurrency_group="update")
async def update_file_metadata(
self,
Expand All @@ -184,10 +198,14 @@ async def update_file_metadata(
await vectordb.async_add_documents.remote(docs, user=user)

log.info("Metadata updated for file.")
except Exception as e:
log.exception("Error in update_file_metadata", error=str(e))

except OpenRAGError:
raise

except Exception as e:
log.exception("Unexpected error in update_file_metadata")
raise UnexpectedError("An unexpected error occurred during metadata update") from e

@ray.method(concurrency_group="update")
async def copy_file(
self,
Expand Down Expand Up @@ -216,10 +234,14 @@ async def copy_file(
new_file_id=metadata.get("file_id"),
new_partition=metadata.get("partition"),
)
except Exception as e:
log.exception("Error in copy_file", error=str(e))

except OpenRAGError:
raise

except Exception as e:
log.exception("Unexpected error in copy_file")
raise UnexpectedError("An unexpected error occurred during file copy") from e

@ray.method(concurrency_group="search")
async def asearch(
self,
Expand Down
20 changes: 15 additions & 5 deletions openrag/components/indexer/loaders/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import base64
import binascii
import re
from abc import ABC, abstractmethod
from io import BytesIO
Expand Down Expand Up @@ -50,10 +51,14 @@ async def aload_document(
):
pass

def save_content(self, text_content: str, path: str):
path = re.sub(r"\..*", ".md", path)
def _write_file(self, path: str, content: str):
"""Synchronous helper for file writing."""
with open(path, "w", encoding="utf-8") as f:
f.write(text_content)
f.write(content)

async def save_content(self, text_content: str, path: str):
path = re.sub(r"\..*", ".md", path)
await asyncio.to_thread(self._write_file, path, text_content)
logger.debug(f"Document saved to {path}")

def _pil_image_to_base64(self, image: Image.Image) -> str:
Expand Down Expand Up @@ -116,8 +121,13 @@ async def get_image_description(
base64.b64decode(image_data)
image_url = f"data:image/png;base64,{image_data}"
logger.debug("Processing raw base64 string")
except Exception:
logger.error(f"Invalid image data type or format: {type(image_data)}")
except (ValueError, binascii.Error) as e:
# Invalid base64 data
logger.warning("Failed to decode base64 image", error=str(e)[:100])
return """\n<image_description>\nInvalid image data format\n</image_description>\n"""
except Exception as e:
# PIL image opening errors or other unexpected issues
logger.warning("Failed to process image data", error=str(e)[:100])
return """\n<image_description>\nInvalid image data format\n</image_description>\n"""
else:
logger.error(f"Unsupported image data type: {type(image_data)}")
Expand Down
25 changes: 16 additions & 9 deletions openrag/components/indexer/loaders/doc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import tempfile

Expand All @@ -14,16 +15,22 @@ def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.MDLoader = DocxLoader(**kwargs)

async def aload_document(self, file_path, metadata, save_markdown=False):
"""Here we convert the document to docx format, save it in local and then use the MarkItDownLoader
to convert it to markdown."""
def _convert_doc_to_docx(self, file_path):
"""Convert .doc to .docx using Spire.Doc (blocking operations in thread pool)."""
document = Document()
document.LoadFromFile(str(file_path))
# file_path = "converted/sample2.docx"
with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as temp_file:
file_path = temp_file.name
document.SaveToFile(file_path, FileFormat.Docx2016)
result_string = await self.MDLoader.aload_document(file_path, metadata, save_markdown)
os.remove(file_path)
document.Close()
temp_path = temp_file.name
document.SaveToFile(temp_path, FileFormat.Docx2016)
return document, temp_path

async def aload_document(self, file_path, metadata, save_markdown=False):
"""Here we convert the document to docx format, save it in local and then use the MarkItDownLoader
to convert it to markdown."""
document, temp_path = await asyncio.to_thread(self._convert_doc_to_docx, file_path)
try:
result_string = await self.MDLoader.aload_document(temp_path, metadata, save_markdown)
finally:
await asyncio.to_thread(os.remove, temp_path)
document.Close()
return result_string
8 changes: 5 additions & 3 deletions openrag/components/indexer/loaders/docx.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
import zipfile
from io import BytesIO
Expand Down Expand Up @@ -28,11 +29,12 @@ def __init__(self, **kwargs):
self.converter = MarkItDown()

async def aload_document(self, file_path, metadata, save_markdown=False):
result = self.converter.convert(file_path).text_content
convert_result = await asyncio.to_thread(self.converter.convert, file_path)
result = convert_result.text_content

if self.image_captioning:
# Handle embedded images (extracted from docx zip)
images = self.get_images_from_zip(file_path)
images = await asyncio.to_thread(self.get_images_from_zip, file_path)
captions = await self.caption_images(images, desc="Captioning embedded images")
for caption in captions:
result = re.sub(
Expand All @@ -54,7 +56,7 @@ async def aload_document(self, file_path, metadata, save_markdown=False):

doc = Document(page_content=result, metadata=metadata)
if save_markdown:
self.save_content(result, str(file_path))
await self.save_content(result, str(file_path))
return doc

def get_images_from_zip(self, input_file):
Expand Down
Loading