Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
751c4c2
fix(01-01): fix nested httpx.Timeout objects in app_front.py
paultranvan Feb 10, 2026
2cc5c30
feat(01-03): add FileMetadataSchema for metadata validation
paultranvan Feb 10, 2026
3a61a48
fix(01-quick-security-fixes): replace unsafe database URL in vectordb.py
paultranvan Feb 10, 2026
b3ba235
fix(01-quick-security-fixes): replace unsafe database URL in alembic …
paultranvan Feb 10, 2026
1dbf7ec
feat(01-03): integrate FileMetadataSchema validation in utils
paultranvan Feb 10, 2026
56dee51
style: apply ruff formatting
paultranvan Feb 11, 2026
403b99f
feat(phase-2): centralized exception handling for API layer
paultranvan Feb 17, 2026
371b78f
feat(phase-2): centralized exception handling for API layer
paultranvan Feb 12, 2026
42a14a6
refactor: move exception subclasses to phase-3 where they are used
paultranvan Feb 17, 2026
1a5f1c9
feat(03-04): add typed exception handling to pipeline and map-reduce
paultranvan Feb 10, 2026
fdd46da
refactor(03-03): replace exception handlers in base, image, media, pp…
paultranvan Feb 10, 2026
2b7427d
feat(03-04): add typed exception handling to reranker and LLM streaming
paultranvan Feb 10, 2026
b49ae9c
refactor(03-01): replace exception handlers in vectordb.py with typed…
paultranvan Feb 10, 2026
43f2456
feat(03-02): replace 5 exception handlers in indexer.py with typed ex…
paultranvan Feb 10, 2026
b4b04e9
refactor(03-03): replace exception handlers in eml, serializer, marke…
paultranvan Feb 10, 2026
51e919c
feat(03-02): replace exception handlers in embeddings and chunker
paultranvan Feb 10, 2026
05ec9fa
refactor(03-01): replace exception handlers in utils.py with typed VD…
paultranvan Feb 10, 2026
6e1005b
refactor(phase-3): replace RuntimeError with typed OpenRAGError subcl…
paultranvan Feb 12, 2026
253fcc8
refactor: add exception subclasses used by core services
paultranvan Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions openrag/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ async def openrag_exception_handler(request: Request, exc: OpenRAGError):
return JSONResponse(status_code=exc.status_code, content=exc.to_dict())


@app.exception_handler(Exception)
async def unhandled_exception_handler(request: Request, exc: Exception):
logger = get_logger()
logger.exception("Unhandled exception", error=str(exc))
return JSONResponse(
status_code=500,
content={"detail": "[UNEXPECTED_ERROR]: An unexpected error occurred", "extra": {}},
)


# Add CORS middleware
allow_origins = [
"http://localhost:3042",
Expand Down
4 changes: 2 additions & 2 deletions openrag/app_front.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def on_chat_resume(thread):
@cl.password_auth_callback
async def auth_callback(username: str, password: str):
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=httpx.Timeout(4 * 60.0))) as client:
async with httpx.AsyncClient(timeout=httpx.Timeout(4 * 60.0)) as client:
response = await client.get(
url=f"{INTERNAL_BASE_URL}/users/info",
headers=get_headers(password),
Expand Down Expand Up @@ -131,7 +131,7 @@ async def on_chat_start():
api_key = user.metadata.get("api_key", "sk-1234") if user else "sk-1234"
logger.debug("New Chat Started", internal_base_url=INTERNAL_BASE_URL)
try:
async with httpx.AsyncClient(timeout=httpx.Timeout(timeout=httpx.Timeout(4 * 60.0))) as client:
async with httpx.AsyncClient(timeout=httpx.Timeout(4 * 60.0)) as client:
response = await client.get(
url=f"{INTERNAL_BASE_URL}/health_check",
headers=get_headers(api_key),
Expand Down
29 changes: 17 additions & 12 deletions openrag/components/indexer/chunker/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import openai
from components.indexer.utils.text_sanitizer import sanitize_text
from utils.exceptions.base import OpenRAGError
from components.prompts import CHUNK_CONTEXTUALIZER_PROMPT
from components.utils import detect_language, get_vlm_semaphore, load_config
from langchain_core.documents.base import Document
Expand Down Expand Up @@ -68,18 +69,20 @@ async def _generate_context(
]
output = await self.context_generator.ainvoke(messages)
return output.content

except openai.APITimeoutError:
logger.warning(
f"OpenAI API timeout contextualizing chunk after {CONTEXTUALIZATION_TIMEOUT}s",
filename=filename,
)
# VLM timeout - graceful degradation
logger.warning("VLM context generation timeout", timeout=CONTEXTUALIZATION_TIMEOUT)
return ""
except Exception as e:
logger.warning(
"Error contextualizing chunk of document",
filename=filename,
error=str(e),
)

except openai.APIError as e:
# Other VLM API errors - log but don't fail chunking
logger.error("VLM context generation failed", error=str(e))
return ""

except Exception:
# Unexpected errors - log but still gracefully degrade
logger.exception("Unexpected error during context generation")
return ""

async def contextualize_chunks(
Expand Down Expand Up @@ -130,8 +133,10 @@ async def contextualize_chunks(
for chunk, context in zip(chunks, contexts, strict=True)
]

except Exception as e:
logger.warning(f"Error contextualizing chunks from `{filename}`: {e}")
except OpenRAGError:
raise
except Exception:
logger.exception("Error contextualizing chunks", filename=filename)
return chunks


Expand Down
14 changes: 12 additions & 2 deletions openrag/components/indexer/embeddings/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,18 @@ def embedding_dimension(self) -> int:
# Test call to get embedding dimension
output = self.embed_documents([Document(page_content="test")])
return len(output[0])

except openai.APIError as e:
logger.error("Failed to get embedding dimension", error=str(e))
raise EmbeddingAPIError(f"API error: {e}", model_name=self.embedding_model)

except (IndexError, AttributeError) as e:
logger.error("Invalid embedding response format", error=str(e))
raise EmbeddingResponseError("Unexpected response format", error=str(e))

except Exception:
raise
logger.exception("Unexpected error getting embedding dimension")
raise UnexpectedEmbeddingError("An unexpected error occurred")

def embed_documents(self, texts: list[str | Document]) -> list[list[float]]:
"""
Expand Down Expand Up @@ -62,7 +72,7 @@ def embed_documents(self, texts: list[str | Document]) -> list[list[float]]:
except Exception as e:
logger.exception("Unexpected error while embedding documents", error=str(e))
raise UnexpectedEmbeddingError(
f"Failed to embed documents: {e!s}",
"An unexpected error occurred during document embedding",
model_name=self.embedding_model,
base_url=self.base_url,
error=str(e),
Expand Down
44 changes: 33 additions & 11 deletions openrag/components/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import torch
from config import load_config
from langchain_core.documents.base import Document
from utils.exceptions.base import OpenRAGError
from utils.exceptions.common import UnexpectedError

from .chunker import BaseChunker, ChunkerFactory
from .utils import serialize_file
Expand Down Expand Up @@ -119,25 +121,34 @@ async def add_file(
# Mark task as completed
await task_state_manager.set_state.remote(task_id, "COMPLETED")

except Exception as e:
log.exception(f"Task {task_id} failed in add_file")
except OpenRAGError as e:
log.error("Operation failed during file ingestion", code=e.code, error=e.message)
tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
await task_state_manager.set_state.remote(task_id, "FAILED")
await task_state_manager.set_error.remote(task_id, tb)
raise

except Exception as e:
# Truly unexpected errors
log.exception("Unexpected error during file ingestion", task_id=task_id)
tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
await task_state_manager.set_state.remote(task_id, "FAILED")
await task_state_manager.set_error.remote(task_id, tb)
raise UnexpectedError("An unexpected error occurred during file processing") from e

finally:
# GPU cleanup
if torch.cuda.is_available():
gc.collect()
torch.cuda.empty_cache()
torch.cuda.ipc_collect()

# File cleanup - nest try/except per Phase 2 decision
try:
# Cleanup input file
if not save_uploaded_files:
Path(path).unlink(missing_ok=True)
log.debug(f"Deleted input file: {path}")
except Exception as cleanup_err:
log.warning(f"Failed to delete input file {path}: {cleanup_err}")
log.warning("Failed to delete input file", path=path, error=str(cleanup_err))
return True

@ray.method(concurrency_group="insert")
Expand All @@ -157,10 +168,13 @@ async def delete_file(self, file_id: str, partition: str) -> bool:
await vectordb.delete_file.remote(file_id, partition)
log.info("Deleted file from partition.", file_id=file_id, partition=partition)

except Exception as e:
log.exception("Error in delete_file", error=str(e))
except OpenRAGError:
raise

except Exception as e:
log.exception("Unexpected error in delete_file")
raise UnexpectedError("An unexpected error occurred during file deletion") from e

@ray.method(concurrency_group="update")
async def update_file_metadata(
self,
Expand All @@ -184,10 +198,14 @@ async def update_file_metadata(
await vectordb.async_add_documents.remote(docs, user=user)

log.info("Metadata updated for file.")
except Exception as e:
log.exception("Error in update_file_metadata", error=str(e))

except OpenRAGError:
raise

except Exception as e:
log.exception("Unexpected error in update_file_metadata")
raise UnexpectedError("An unexpected error occurred during metadata update") from e

@ray.method(concurrency_group="update")
async def copy_file(
self,
Expand Down Expand Up @@ -216,10 +234,14 @@ async def copy_file(
new_file_id=metadata.get("file_id"),
new_partition=metadata.get("partition"),
)
except Exception as e:
log.exception("Error in copy_file", error=str(e))

except OpenRAGError:
raise

except Exception as e:
log.exception("Unexpected error in copy_file")
raise UnexpectedError("An unexpected error occurred during file copy") from e

@ray.method(concurrency_group="search")
async def asearch(
self,
Expand Down
9 changes: 7 additions & 2 deletions openrag/components/indexer/loaders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,13 @@ async def get_image_description(
base64.b64decode(image_data)
image_url = f"data:image/png;base64,{image_data}"
logger.debug("Processing raw base64 string")
except Exception:
logger.error(f"Invalid image data type or format: {type(image_data)}")
except (ValueError, base64.binascii.Error) as e:
# Invalid base64 data
logger.warning("Failed to decode base64 image", error=str(e)[:100])
return """\n<image_description>\nInvalid image data format\n</image_description>\n"""
except Exception as e:
# PIL image opening errors or other unexpected issues
logger.warning("Failed to process image data", error=str(e)[:100])
return """\n<image_description>\nInvalid image data format\n</image_description>\n"""
else:
logger.error(f"Unsupported image data type: {type(image_data)}")
Expand Down
47 changes: 41 additions & 6 deletions openrag/components/indexer/loaders/eml_loader.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import datetime
import email
import email.errors
import io
import os
import tempfile
from email.utils import parsedate_to_datetime
from pathlib import Path

from langchain_core.documents.base import Document
from PIL import Image
from PIL import Image, UnidentifiedImageError
from utils.exceptions.common import FileStorageError, UnexpectedError

from . import get_loader_classes
from .base import BaseLoader
Expand Down Expand Up @@ -52,7 +54,8 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
if email_data["header"]["date"]:
try:
email_data["header"]["date"] = parsedate_to_datetime(email_data["header"]["date"]).isoformat()
except Exception:
except (ValueError, TypeError, email.errors.MessageError):
# Invalid date format - keep original string
pass

# Extract body content and attachments
Expand Down Expand Up @@ -98,7 +101,8 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
# Use plain text as primary body content
if content_type == "text/plain" or not body_content:
body_content = text_content
except Exception as e:
except (UnicodeDecodeError, email.errors.MessageError) as e:
# Failed to decode email text part - skip this part
print(f"Failed to decode text content: {e}")

# Extract body content
Expand Down Expand Up @@ -140,7 +144,11 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
metadata={"source": f"attachment:{filename}"},
)
attachments_text += f"Content:\n{attachment_doc.page_content}\n"
except OSError as e:
# File I/O error - skip this attachment
attachments_text += f"Cannot read attachment file: {str(e)[:200]}...\n"
except Exception as e:
# Loader-specific errors - try fallback loaders
attachments_text += f"Failed to process attachment with loader ({loader_cls.__name__}): {str(e)[:200]}...\n"

# Special fallback handling for PDFs with alternative loaders
Expand Down Expand Up @@ -179,7 +187,11 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
attachments_text += f"Content (via {fallback_loader_name}):\n{attachment_doc.page_content}\n"
fallback_success = True
break
except OSError as fallback_e:
# File I/O error - skip fallback
attachments_text += f"Fallback {fallback_loader_name} file error: {str(fallback_e)[:100]}...\n"
except Exception as fallback_e:
# Fallback loader failed - try next
attachments_text += f"Fallback {fallback_loader_name} also failed: {str(fallback_e)[:100]}...\n"

if not fallback_success:
Expand All @@ -197,7 +209,11 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
attachments_text += (
"Image attachment present but image captioning disabled\n"
)
except (OSError, UnidentifiedImageError) as img_e:
# Invalid or unreadable image
attachments_text += f"Image fallback failed (invalid image): {str(img_e)[:100]}...\n"
except Exception as img_e:
# Unexpected image processing error
attachments_text += f"Image fallback also failed: {str(img_e)[:100]}...\n"

# Try text fallback for other text-based formats
Expand All @@ -213,7 +229,11 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
)
else:
attachments_text += "No readable text found in attachment\n"
except UnicodeDecodeError as text_e:
# Text decoding failed
attachments_text += f"Text fallback failed (encoding error): {str(text_e)[:100]}...\n"
except Exception as text_e:
# Unexpected text extraction error
attachments_text += f"Text fallback failed: {str(text_e)[:100]}...\n"
finally:
# Clean up temporary file
Expand All @@ -237,8 +257,9 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
# Generate caption using the base loader's method
caption = await self.get_image_description(image_data=image)
attachments_text += f"Image Description:\n{caption}\n"
except Exception as e:
attachments_text += f"Failed to generate image caption: {str(e)[:200]}...\n"
except (OSError, UnidentifiedImageError) as e:
# Invalid or corrupted image
attachments_text += f"Failed to generate image caption (invalid image): {str(e)[:200]}...\n"
# Try to show basic image info if available
try:
size_info = f"Image size: {len(attachment['raw'])} bytes"
Expand All @@ -247,6 +268,9 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
)
except Exception:
attachments_text += "Image attachment present but corrupted or unreadable\n"
except Exception as e:
# Unexpected image captioning error (VLM errors handled in base.py)
attachments_text += f"Failed to generate image caption: {str(e)[:200]}...\n"

elif content_type.startswith("text/"):
# For text attachments, decode directly
Expand All @@ -255,7 +279,11 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
else:
# For other binary content, just show metadata
attachments_text += f"Binary content (size: {len(attachment['raw'])} bytes)\n"
except OSError as e:
# File I/O error creating temp file
attachments_text += f"Cannot create temp file for attachment: {e}\n"
except Exception as e:
# Unexpected error processing attachment
attachments_text += f"Content could not be processed: {e}\n"
attachments_text += "---\n"

Expand Down Expand Up @@ -298,8 +326,15 @@ async def aload_document(self, file_path, metadata: dict | None = None, save_mar
with open(markdown_path, "w", encoding="utf-8") as md_file:
md_file.write(content_body)
metadata["markdown_path"] = str(markdown_path)
except OSError as e:
# File I/O error reading email file
raise FileStorageError(f"Cannot read email file: {e}") from e
except email.errors.MessageError as e:
# Email parsing error
raise UnexpectedError(f"Invalid email format: {e}") from e
except Exception as e:
raise ValueError(f"Failed to parse the EML file {file_path}: {e}")
# Unexpected error
raise UnexpectedError(f"Failed to parse the EML file {file_path}: {e}") from e

document = Document(page_content=content_body, metadata=metadata)
return document
Loading