Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .hydra_config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ reranker:
model_name: ${oc.env:RERANKER_MODEL, Alibaba-NLP/gte-multilingual-reranker-base}
top_k: ${oc.decode:${oc.env:RERANKER_TOP_K, 5}} # Number of documents to return after reranking. upgrade to 8 for better results if your llm has a wider context window
base_url: ${oc.env:RERANKER_BASE_URL, http://reranker:${oc.env:RERANKER_PORT, 7997}}
# Temporal scoring parameters
temporal_weight: ${oc.decode:${oc.env:RERANKER_TEMPORAL_WEIGHT, 0.3}} # Weight for temporal scoring (0.0-1.0), 0.3 means 30% temporal, 70% relevance
temporal_decay_days: ${oc.decode:${oc.env:RERANKER_TEMPORAL_DECAY_DAYS, 365}} # Days for temporal score to decay to near zero

map_reduce:
# Number of documents to process in the initial mapping phase
Expand Down
5 changes: 2 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ include:
- extern/infinity.yaml

x-openrag: &openrag_template
image: ghcr.io/linagora/openrag:dev-latest
# image: linagoraai/openrag:latest
image: linagoraai/openrag:latest
build:
context: .
dockerfile: Dockerfile
Expand Down Expand Up @@ -58,7 +57,7 @@ x-vllm: &vllm_template
services:
# OpenRAG Indexer UI
indexer-ui:
image: linagoraai/indexer-ui:v1.1
image: linagoraai/indexer-ui:latest
build:
context: ./extern/indexer-ui
dockerfile: Dockerfile
Expand Down
37 changes: 19 additions & 18 deletions openrag/components/indexer/chunker/chunker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
from datetime import datetime, timezone

from components.prompts import CHUNK_CONTEXTUALIZER
from components.utils import get_llm_semaphore, load_config
Expand Down Expand Up @@ -235,12 +236,12 @@ async def split_document(self, doc: Document, task_id: str = None):
start_page = page_info["start_page"]
end_page = page_info["end_page"]
prev_page_num = end_page
filtered_chunks.append(
Document(
page_content=chunk_w_context,
metadata={**metadata, "page": start_page},
)
)
chunk_meta = {
**metadata,
"page": start_page,
"indexed_at": datetime.now(timezone.utc).isoformat(),
}
filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta))
log.info("Document chunking completed")
return filtered_chunks

Expand Down Expand Up @@ -352,12 +353,12 @@ async def split_document(self, doc: Document, task_id: str = None):
start_page = page_info["start_page"]
end_page = page_info["end_page"]
prev_page_num = end_page
filtered_chunks.append(
Document(
page_content=chunk_w_context,
metadata={**metadata, "page": start_page},
)
)
chunk_meta = {
**metadata,
"page": start_page,
"indexed_at": datetime.now(timezone.utc).isoformat(),
}
filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta))
log.info("Document chunking completed")
return filtered_chunks

Expand Down Expand Up @@ -457,12 +458,12 @@ async def split_document(self, doc: Document, task_id: str = None):
start_page = page_info["start_page"]
end_page = page_info["end_page"]
prev_page_num = end_page
filtered_chunks.append(
Document(
page_content=chunk_w_context,
metadata={**metadata, "page": start_page},
)
)
chunk_meta = {
**metadata,
"page": start_page,
"indexed_at": datetime.now(timezone.utc).isoformat(),
}
filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta))
log.info("Document chunking completed")
return filtered_chunks

Expand Down
94 changes: 93 additions & 1 deletion openrag/components/indexer/vectordb/vectordb.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,32 @@ def _create_schema(self):
max_length=MAX_LENGTH,
)

# Add temporal fields for temporal awareness (priority: datetime > modified_at > created_at > indexed_at)
# datetime: user-provided primary temporal field (highest priority)
schema.add_field(
field_name="datetime",
datatype=DataType.VARCHAR,
max_length=64,
)

schema.add_field(
field_name="created_at",
datatype=DataType.VARCHAR,
max_length=64,
)

schema.add_field(
field_name="modified_at",
datatype=DataType.VARCHAR,
max_length=64,
)

schema.add_field(
field_name="indexed_at",
datatype=DataType.VARCHAR,
max_length=64,
)

schema.add_field(
field_name="vector",
datatype=DataType.FLOAT_VECTOR,
Expand Down Expand Up @@ -303,6 +329,31 @@ def _create_index(self):
field_name="partition", index_type="INVERTED", index_name="partition_idx"
)

# Add indexes for temporal fields to enable efficient filtering
index_params.add_index(
field_name="datetime",
index_type="INVERTED",
index_name="datetime_idx",
)

index_params.add_index(
field_name="created_at",
index_type="INVERTED",
index_name="created_at_idx",
)

index_params.add_index(
field_name="modified_at",
index_type="INVERTED",
index_name="modified_at_idx",
)

index_params.add_index(
field_name="indexed_at",
index_type="INVERTED",
index_name="indexed_at_idx",
)

# Add index for vector field
index_params.add_index(
field_name="vector",
Expand Down Expand Up @@ -430,8 +481,49 @@ async def async_search(
expr_parts.append(f"partition in {partition}")

if filter:
# Handle temporal filters with OR logic across different timestamp fields
# This allows documents to match if ANY of their timestamp fields (datetime, modified_at, created_at, indexed_at) fall within the range
temporal_keys = [
"datetime_after", "datetime_before",
"created_after", "created_before",
"modified_after", "modified_before",
"indexed_after", "indexed_before"
]

# Build temporal conditions with OR logic
temporal_conditions = []

# Extract the generic after/before values from created_after/created_before
after_value = filter.get("created_after") or filter.get("datetime_after") or filter.get("modified_after") or filter.get("indexed_after")
before_value = filter.get("created_before") or filter.get("datetime_before") or filter.get("modified_before") or filter.get("indexed_before")

if after_value or before_value:
# For each timestamp field, create conditions
field_conditions = []
for field in ["datetime", "modified_at", "created_at", "indexed_at"]:
field_parts = []
if after_value:
field_parts.append(f"{field} >= '{after_value}'")
if before_value:
field_parts.append(f"{field} <= '{before_value}'")

if field_parts:
# Each field condition: (field >= after AND field <= before)
field_condition = " and ".join(field_parts)
field_conditions.append(f"({field_condition})")

# Combine all field conditions with OR
if field_conditions:
temporal_expr = " or ".join(field_conditions)
temporal_conditions.append(f"({temporal_expr})")

if temporal_conditions:
expr_parts.extend(temporal_conditions)

# Handle other filters (exact match)
for key, value in filter.items():
expr_parts.append(f"{key} == '{value}'")
if key not in temporal_keys:
expr_parts.append(f"{key} == '{value}'")

# Join all parts with " and " only if there are multiple conditions
expr = " and ".join(expr_parts) if expr_parts else ""
Expand Down
49 changes: 43 additions & 6 deletions openrag/components/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import copy
import json
from datetime import datetime, timezone
from enum import Enum

from components.prompts import QUERY_CONTEXTUALIZER_PROMPT, SYS_PROMPT_TMPLT
from langchain_core.documents.base import Document
from openai import AsyncOpenAI
from utils.logger import get_logger
from utils.temporal import TemporalQueryNormalizer

from .llm import LLM
from .map_reduce import RAGMapReduce
Expand Down Expand Up @@ -41,9 +44,11 @@ def __init__(self, config) -> None:
self.reranker = Reranker(logger, config)

async def retrieve_docs(
self, partition: list[str], query: str, use_map_reduce: bool = False
self, partition: list[str], query: str, use_map_reduce: bool = False, temporal_filter: dict = None
) -> list[Document]:
docs = await self.retriever.retrieve(partition=partition, query=query)
docs = await self.retriever.retrieve(
partition=partition, query=query, temporal_filter=temporal_filter
)
top_k = (
max(self.map_reduce_max_docs, self.reranker_top_k)
if use_map_reduce
Expand Down Expand Up @@ -79,6 +84,9 @@ def __init__(self, config) -> None:

# map reduce
self.map_reduce: RAGMapReduce = RAGMapReduce(config=config)

# temporal query normalizer
self.temporal_normalizer = TemporalQueryNormalizer()

async def generate_query(self, messages: list[dict]) -> str:
match RAGMODE(self.rag_mode):
Expand Down Expand Up @@ -124,11 +132,29 @@ async def _prepare_for_chat_completion(self, partition: list[str], payload: dict

metadata = payload.get("metadata", {})
use_map_reduce = metadata.get("use_map_reduce", False)
logger.info("Metadata parameters", use_map_reduce=use_map_reduce)

# Extract temporal filter from query if not provided in metadata
temporal_filter = metadata.get("temporal_filter", None)
if not temporal_filter:
temporal_filter = self.temporal_normalizer.extract_temporal_filter(query)
if temporal_filter:
logger.info(
"Extracted temporal filter from query",
created_after=temporal_filter.get("created_after"),
created_before=temporal_filter.get("created_before"),
modified_after=temporal_filter.get("modified_after"),
modified_before=temporal_filter.get("modified_before"),
datetime_after=temporal_filter.get("datetime_after"),
datetime_before=temporal_filter.get("datetime_before"),
)

# 2. get docs
logger.info(
"Metadata parameters",
use_map_reduce=use_map_reduce,
temporal_filter_present=temporal_filter is not None,
) # 2. get docs
docs = await self.retriever_pipeline.retrieve_docs(
partition=partition, query=query, use_map_reduce=use_map_reduce
partition=partition, query=query, use_map_reduce=use_map_reduce, temporal_filter=temporal_filter
)

if use_map_reduce and docs:
Expand All @@ -140,14 +166,25 @@ async def _prepare_for_chat_completion(self, partition: list[str], payload: dict
# 4. prepare the output
messages: list = copy.deepcopy(messages)

# Get current datetime in UTC for temporal awareness
current_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")

# prepend the messages with the system prompt
system_prompt_content = SYS_PROMPT_TMPLT.format(
context=context, current_datetime=current_datetime
)
messages.insert(
0,
{
"role": "system",
"content": SYS_PROMPT_TMPLT.format(context=context),
"content": system_prompt_content,
},
)

# Debug: log the formatted system prompt
logger.debug("System prompt with context", prompt_length=len(system_prompt_content), doc_count=len(docs))
logger.debug("Full system prompt", content=system_prompt_content)

payload["messages"] = messages
return payload, docs

Expand Down
Loading
Loading