diff --git a/.hydra_config/config.yaml b/.hydra_config/config.yaml index 69d170c6..e6169bed 100644 --- a/.hydra_config/config.yaml +++ b/.hydra_config/config.yaml @@ -51,6 +51,9 @@ reranker: model_name: ${oc.env:RERANKER_MODEL, Alibaba-NLP/gte-multilingual-reranker-base} top_k: ${oc.decode:${oc.env:RERANKER_TOP_K, 5}} # Number of documents to return after reranking. upgrade to 8 for better results if your llm has a wider context window base_url: ${oc.env:RERANKER_BASE_URL, http://reranker:${oc.env:RERANKER_PORT, 7997}} + # Temporal scoring parameters + temporal_weight: ${oc.decode:${oc.env:RERANKER_TEMPORAL_WEIGHT, 0.3}} # Weight for temporal scoring (0.0-1.0), 0.3 means 30% temporal, 70% relevance + temporal_decay_days: ${oc.decode:${oc.env:RERANKER_TEMPORAL_DECAY_DAYS, 365}} # Days for temporal score to decay to near zero map_reduce: # Number of documents to process in the initial mapping phase diff --git a/docker-compose.yaml b/docker-compose.yaml index a69ec3d5..2f473d9f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -4,8 +4,7 @@ include: - extern/infinity.yaml x-openrag: &openrag_template - image: ghcr.io/linagora/openrag:dev-latest - # image: linagoraai/openrag:latest + image: linagoraai/openrag:latest build: context: . dockerfile: Dockerfile @@ -58,7 +57,7 @@ x-vllm: &vllm_template services: # OpenRAG Indexer UI indexer-ui: - image: linagoraai/indexer-ui:v1.1 + image: linagoraai/indexer-ui:latest build: context: ./extern/indexer-ui dockerfile: Dockerfile diff --git a/docs/content/docs/documentation/API.mdx b/docs/content/docs/documentation/API.mdx index 04fd150a..5114da46 100644 --- a/docs/content/docs/documentation/API.mdx +++ b/docs/content/docs/documentation/API.mdx @@ -219,10 +219,45 @@ curl -X POST http://localhost:8080/v1/chat/completions \ } ], "temperature": 0.7, - "stream": false + "stream": false, + "metadata": { + "use_map_reduce": false, + "temporal_filter": { + "created_after": "2024-01-01T00:00:00Z", + "created_before": "2024-12-31T23:59:59Z" + } + } }' ``` +**Temporal Filtering:** + +OpenRAG supports temporal filtering to retrieve documents from specific time periods. You can include a `temporal_filter` object in the `metadata` field: + +- **Automatic extraction**: If no `temporal_filter` is provided, OpenRAG automatically extracts temporal expressions from your query (e.g., "documents from 2024", "last week's updates") +- **Manual filtering**: Explicitly specify date ranges using the following fields: + - `datetime_after` / `datetime_before`: Filter by document content datetime (highest priority) + - `modified_after` / `modified_before`: Filter by document modification date + - `created_after` / `created_before`: Filter by document creation date + - `indexed_after` / `indexed_before`: Filter by document indexing date + +All dates should be in ISO 8601 format (e.g., `2024-01-15T00:00:00Z`). + +**Example with temporal filter:** +```python +# Query documents from 2024 only +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "What are the latest updates?"}], + metadata={ + "temporal_filter": { + "created_after": "2024-01-01T00:00:00Z", + "created_before": "2024-12-31T23:59:59Z" + } + } +) +``` + * Text Completions ```http POST /v1/completions @@ -263,6 +298,51 @@ response = client.chat.completions.create( ) ``` +#### Example with Temporal Filtering + +```python +from openai import OpenAI +from datetime import datetime, timedelta, timezone + +client = OpenAI(api_key='your-auth-token', base_url="http://localhost:8080/v1") + +# Example 1: Query recent documents (last 30 days) +thirty_days_ago = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat() +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "What are the latest developments?"}], + extra_body={ + "metadata": { + "temporal_filter": { + "indexed_after": thirty_days_ago + } + } + } +) + +# Example 2: Query documents from a specific year +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "What happened in 2024?"}], + extra_body={ + "metadata": { + "temporal_filter": { + "created_after": "2024-01-01T00:00:00Z", + "created_before": "2024-12-31T23:59:59Z" + } + } + } +) + +# Example 3: Let OpenRAG automatically extract temporal context from query +# Queries like "last week", "documents from 2024", "recent changes" +# will automatically be filtered without explicit temporal_filter +response = client.chat.completions.create( + model="openrag-my_partition", + messages=[{"role": "user", "content": "Show me documents from last week"}] +) +``` + --- ## ⚠️ Error Handling diff --git a/docs/content/docs/documentation/data_model.md b/docs/content/docs/documentation/data_model.md index 9edb9881..3bbcb8ac 100644 --- a/docs/content/docs/documentation/data_model.md +++ b/docs/content/docs/documentation/data_model.md @@ -106,3 +106,45 @@ Defines the many-to-many relationship between **users** and **partitions**, incl --- +## **Vector Database Schema (Milvus)** + +In addition to the relational database, OpenRAG uses Milvus to store document chunks with embeddings and metadata. + +### Document Chunk Fields + +| Field Name | Type | Description | +|------------|------|-------------| +| `embedding` | FloatVector | Dense vector embedding of the chunk content | +| `sparse_vector` | SparseFloatVector | Sparse BM25 vector for hybrid search | +| `content` | VarChar | The actual text content of the chunk | +| `partition` | VarChar | Partition name this chunk belongs to | +| `filename` | VarChar | Source filename | +| `page` | Int64 | Page number in the source document | +| `_id` | VarChar (PK) | Unique chunk identifier | +| `datetime` | VarChar | Primary timestamp from document content (user-provided) | +| `modified_at` | VarChar | Document modification timestamp (ISO 8601 format) | +| `created_at` | VarChar | Document creation timestamp (ISO 8601 format) | +| `indexed_at` | VarChar | When the chunk was indexed into OpenRAG (ISO 8601 format) | + +### Temporal Fields Priority + +When filtering or scoring documents by time, OpenRAG uses the following priority: +1. **`datetime`** - Highest priority, user-provided timestamp from document content +2. **`modified_at`** - Document modification date +3. **`created_at`** - Document creation date +4. **`indexed_at`** - Fallback, when the document was indexed + +All temporal fields are stored in ISO 8601 format (e.g., `2024-01-15T10:30:00Z`). + +### Temporal Filtering + +Vector database queries support temporal filtering with the following parameters: +- `datetime_after` / `datetime_before` +- `modified_after` / `modified_before` +- `created_after` / `created_before` +- `indexed_after` / `indexed_before` + +These filters use **OR logic** between the different date fields to ensure maximum flexibility in retrieving time-relevant documents. + +--- + diff --git a/docs/content/docs/documentation/features_in_details.md b/docs/content/docs/documentation/features_in_details.md index 9dd70133..363f0c36 100644 --- a/docs/content/docs/documentation/features_in_details.md +++ b/docs/content/docs/documentation/features_in_details.md @@ -84,4 +84,49 @@ See the section on [distributed deployment in a ray cluster](#5-distributed-depl * **Contextual retrieval** - Anthropic's technique for enhanced chunk relevance * **Multilingual reranking** - using `Alibaba-NLP/gte-multilingual-reranker-base` + + +### 🕐 Temporal Awareness +OpenRAG includes intelligent temporal understanding to deliver more relevant, time-aware responses. + +
+ +Temporal Features + +* **Automatic date extraction** - Detects temporal expressions in queries across multiple languages + * ISO dates: `2024-01-15`, `2024-01-15T10:30:00` + * Numeric formats: `15/01/2024`, `01/15/2024`, `15.01.2024` + * Month-year: `01/2024`, `2024/01` + * Year only: `2024`, `2023` + * Relative time: "last 30 days", "últimos 7 días", "derniers 30 jours" + * Keywords: "today", "yesterday", "recent" (and multilingual equivalents) + +* **Document timestamp metadata** - Tracks temporal information for each document + * `datetime` - Primary timestamp from document content (user-provided) + * `modified_at` - Document modification timestamp + * `created_at` - Document creation timestamp + * `indexed_at` - When the document was indexed into OpenRAG + +* **Temporal filtering** - Automatically filters search results based on detected time ranges + * Queries like "documents from 2024" only retrieve relevant documents + * "Last week's updates" focuses on recent content + * Works across all retrieval methods (base, multi-query, HyDE) + +* **Temporal scoring in reranking** - Balances relevance with recency + * Combines semantic relevance score with temporal score + * Configurable temporal weight (default: 30% temporal, 70% relevance) + * Linear decay formula favors more recent documents + * Configurable decay period (default: 365 days) + * Priority hierarchy: `datetime` > `modified_at` > `created_at` > `indexed_at` + +* **Temporal-aware prompts** - LLM receives temporal context + * Current date/time injected into system prompt + * Document timestamps included in retrieved chunks + * LLM instructed to consider recency when answering + * Prioritizes newer information for time-sensitive queries + +* **Configuration options** via environment variables: + * `RERANKER_TEMPORAL_WEIGHT` - Weight for temporal scoring (0.0-1.0, default: 0.3) + * `RERANKER_TEMPORAL_DECAY_DAYS` - Days for temporal score decay (default: 365) +
\ No newline at end of file diff --git a/openrag/components/indexer/chunker/chunker.py b/openrag/components/indexer/chunker/chunker.py index 945cb9e2..d1a27992 100644 --- a/openrag/components/indexer/chunker/chunker.py +++ b/openrag/components/indexer/chunker/chunker.py @@ -2,6 +2,7 @@ from abc import ABC, abstractmethod from pathlib import Path from typing import Optional +from datetime import datetime, timezone from components.prompts import CHUNK_CONTEXTUALIZER from components.utils import get_llm_semaphore, load_config @@ -235,12 +236,12 @@ async def split_document(self, doc: Document, task_id: str = None): start_page = page_info["start_page"] end_page = page_info["end_page"] prev_page_num = end_page - filtered_chunks.append( - Document( - page_content=chunk_w_context, - metadata={**metadata, "page": start_page}, - ) - ) + chunk_meta = { + **metadata, + "page": start_page, + "indexed_at": datetime.now(timezone.utc).isoformat(), + } + filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta)) log.info("Document chunking completed") return filtered_chunks @@ -352,12 +353,12 @@ async def split_document(self, doc: Document, task_id: str = None): start_page = page_info["start_page"] end_page = page_info["end_page"] prev_page_num = end_page - filtered_chunks.append( - Document( - page_content=chunk_w_context, - metadata={**metadata, "page": start_page}, - ) - ) + chunk_meta = { + **metadata, + "page": start_page, + "indexed_at": datetime.now(timezone.utc).isoformat(), + } + filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta)) log.info("Document chunking completed") return filtered_chunks @@ -457,12 +458,12 @@ async def split_document(self, doc: Document, task_id: str = None): start_page = page_info["start_page"] end_page = page_info["end_page"] prev_page_num = end_page - filtered_chunks.append( - Document( - page_content=chunk_w_context, - metadata={**metadata, "page": start_page}, - ) - ) + chunk_meta = { + **metadata, + "page": start_page, + "indexed_at": datetime.now(timezone.utc).isoformat(), + } + filtered_chunks.append(Document(page_content=chunk_w_context, metadata=chunk_meta)) log.info("Document chunking completed") return filtered_chunks diff --git a/openrag/components/indexer/vectordb/vectordb.py b/openrag/components/indexer/vectordb/vectordb.py index f3f0fe39..9f186a9c 100644 --- a/openrag/components/indexer/vectordb/vectordb.py +++ b/openrag/components/indexer/vectordb/vectordb.py @@ -262,6 +262,32 @@ def _create_schema(self): max_length=MAX_LENGTH, ) + # Add temporal fields for temporal awareness (priority: datetime > modified_at > created_at > indexed_at) + # datetime: user-provided primary temporal field (highest priority) + schema.add_field( + field_name="datetime", + datatype=DataType.VARCHAR, + max_length=64, + ) + + schema.add_field( + field_name="created_at", + datatype=DataType.VARCHAR, + max_length=64, + ) + + schema.add_field( + field_name="modified_at", + datatype=DataType.VARCHAR, + max_length=64, + ) + + schema.add_field( + field_name="indexed_at", + datatype=DataType.VARCHAR, + max_length=64, + ) + schema.add_field( field_name="vector", datatype=DataType.FLOAT_VECTOR, @@ -303,6 +329,31 @@ def _create_index(self): field_name="partition", index_type="INVERTED", index_name="partition_idx" ) + # Add indexes for temporal fields to enable efficient filtering + index_params.add_index( + field_name="datetime", + index_type="INVERTED", + index_name="datetime_idx", + ) + + index_params.add_index( + field_name="created_at", + index_type="INVERTED", + index_name="created_at_idx", + ) + + index_params.add_index( + field_name="modified_at", + index_type="INVERTED", + index_name="modified_at_idx", + ) + + index_params.add_index( + field_name="indexed_at", + index_type="INVERTED", + index_name="indexed_at_idx", + ) + # Add index for vector field index_params.add_index( field_name="vector", @@ -360,6 +411,23 @@ async def async_add_documents(self, chunks: list[Document], user: dict) -> None: ) entities = await self.embedder.embed_documents(chunks) + + # Milvus text/VARCHAR fields should not be null. Ensure temporal + # and other text fields are present; use empty string when missing. + for ent, chunk in zip(entities, chunks): + md = dict(getattr(chunk, "metadata", {}) or {}) + + # Temporal text fields: prefer metadata value, otherwise empty string + for tf in ("datetime", "created_at", "modified_at", "indexed_at"): + if ent.get(tf) is None: + ent[tf] = md.get(tf, "") + + # Ensure partition and file_id are always present as strings + if ent.get("partition") is None: + ent["partition"] = md.get("partition", "") + if ent.get("file_id") is None: + ent["file_id"] = md.get("file_id", "") + await self._async_client.insert( collection_name=self.collection_name, data=entities, @@ -430,8 +498,49 @@ async def async_search( expr_parts.append(f"partition in {partition}") if filter: + # Handle temporal filters with OR logic across different timestamp fields + # This allows documents to match if ANY of their timestamp fields (datetime, modified_at, created_at, indexed_at) fall within the range + temporal_keys = [ + "datetime_after", "datetime_before", + "created_after", "created_before", + "modified_after", "modified_before", + "indexed_after", "indexed_before" + ] + + # Build temporal conditions with OR logic + temporal_conditions = [] + + # Extract the generic after/before values from created_after/created_before + after_value = filter.get("created_after") or filter.get("datetime_after") or filter.get("modified_after") or filter.get("indexed_after") + before_value = filter.get("created_before") or filter.get("datetime_before") or filter.get("modified_before") or filter.get("indexed_before") + + if after_value or before_value: + # For each timestamp field, create conditions + field_conditions = [] + for field in ["datetime", "modified_at", "created_at", "indexed_at"]: + field_parts = [] + if after_value: + field_parts.append(f"{field} >= '{after_value}'") + if before_value: + field_parts.append(f"{field} <= '{before_value}'") + + if field_parts: + # Each field condition: (field >= after AND field <= before) + field_condition = " and ".join(field_parts) + field_conditions.append(f"({field_condition})") + + # Combine all field conditions with OR + if field_conditions: + temporal_expr = " or ".join(field_conditions) + temporal_conditions.append(f"({temporal_expr})") + + if temporal_conditions: + expr_parts.extend(temporal_conditions) + + # Handle other filters (exact match) for key, value in filter.items(): - expr_parts.append(f"{key} == '{value}'") + if key not in temporal_keys: + expr_parts.append(f"{key} == '{value}'") # Join all parts with " and " only if there are multiple conditions expr = " and ".join(expr_parts) if expr_parts else "" diff --git a/openrag/components/pipeline.py b/openrag/components/pipeline.py index 48bf6f24..a27bedb6 100644 --- a/openrag/components/pipeline.py +++ b/openrag/components/pipeline.py @@ -1,10 +1,13 @@ import copy +import json +from datetime import datetime, timezone from enum import Enum from components.prompts import QUERY_CONTEXTUALIZER_PROMPT, SYS_PROMPT_TMPLT from langchain_core.documents.base import Document from openai import AsyncOpenAI from utils.logger import get_logger +from utils.temporal import TemporalQueryNormalizer from .llm import LLM from .map_reduce import RAGMapReduce @@ -41,9 +44,11 @@ def __init__(self, config) -> None: self.reranker = Reranker(logger, config) async def retrieve_docs( - self, partition: list[str], query: str, use_map_reduce: bool = False + self, partition: list[str], query: str, use_map_reduce: bool = False, temporal_filter: dict = None ) -> list[Document]: - docs = await self.retriever.retrieve(partition=partition, query=query) + docs = await self.retriever.retrieve( + partition=partition, query=query, temporal_filter=temporal_filter + ) top_k = ( max(self.map_reduce_max_docs, self.reranker_top_k) if use_map_reduce @@ -79,6 +84,9 @@ def __init__(self, config) -> None: # map reduce self.map_reduce: RAGMapReduce = RAGMapReduce(config=config) + + # temporal query normalizer + self.temporal_normalizer = TemporalQueryNormalizer() async def generate_query(self, messages: list[dict]) -> str: match RAGMODE(self.rag_mode): @@ -124,11 +132,29 @@ async def _prepare_for_chat_completion(self, partition: list[str], payload: dict metadata = payload.get("metadata", {}) use_map_reduce = metadata.get("use_map_reduce", False) - logger.info("Metadata parameters", use_map_reduce=use_map_reduce) + + # Extract temporal filter from query if not provided in metadata + temporal_filter = metadata.get("temporal_filter", None) + if not temporal_filter: + temporal_filter = self.temporal_normalizer.extract_temporal_filter(query) + if temporal_filter: + logger.info( + "Extracted temporal filter from query", + created_after=temporal_filter.get("created_after"), + created_before=temporal_filter.get("created_before"), + modified_after=temporal_filter.get("modified_after"), + modified_before=temporal_filter.get("modified_before"), + datetime_after=temporal_filter.get("datetime_after"), + datetime_before=temporal_filter.get("datetime_before"), + ) - # 2. get docs + logger.info( + "Metadata parameters", + use_map_reduce=use_map_reduce, + temporal_filter_present=temporal_filter is not None, + ) # 2. get docs docs = await self.retriever_pipeline.retrieve_docs( - partition=partition, query=query, use_map_reduce=use_map_reduce + partition=partition, query=query, use_map_reduce=use_map_reduce, temporal_filter=temporal_filter ) if use_map_reduce and docs: @@ -140,14 +166,25 @@ async def _prepare_for_chat_completion(self, partition: list[str], payload: dict # 4. prepare the output messages: list = copy.deepcopy(messages) + # Get current datetime in UTC for temporal awareness + current_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") + # prepend the messages with the system prompt + system_prompt_content = SYS_PROMPT_TMPLT.format( + context=context, current_datetime=current_datetime + ) messages.insert( 0, { "role": "system", - "content": SYS_PROMPT_TMPLT.format(context=context), + "content": system_prompt_content, }, ) + + # Debug: log the formatted system prompt + logger.debug("System prompt with context", prompt_length=len(system_prompt_content), doc_count=len(docs)) + logger.debug("Full system prompt", content=system_prompt_content) + payload["messages"] = messages return payload, docs diff --git a/openrag/components/reranker.py b/openrag/components/reranker.py index 474dfd57..faffffdc 100644 --- a/openrag/components/reranker.py +++ b/openrag/components/reranker.py @@ -1,9 +1,11 @@ import asyncio +from datetime import datetime, timezone from infinity_client import Client from infinity_client.api.default import rerank from infinity_client.models import RerankInput, ReRankResult from langchain_core.documents.base import Document + class Reranker: def __init__(self, logger, config): self.model_name = config.reranker["model_name"] @@ -12,7 +14,62 @@ def __init__(self, logger, config): self.semaphore = asyncio.Semaphore( 5 ) # Only allow 5 reranking operation at a time - self.logger.debug("Reranker initialized", model_name=self.model_name) + + # Temporal scoring parameters + self.temporal_weight = config.reranker.get("temporal_weight", 0.3) # Weight for temporal scoring + self.temporal_decay_days = config.reranker.get("temporal_decay_days", 365) # Days for full decay + + self.logger.debug( + "Reranker initialized", + model_name=self.model_name, + temporal_weight=self.temporal_weight, + temporal_decay_days=self.temporal_decay_days, + ) + + def _calculate_temporal_score(self, doc: Document) -> float: + """ + Calculate temporal score based on document recency. + More recent documents get higher scores (0.0 to 1.0). + + Uses linear decay based on document age. + Priority: datetime > modified_at > created_at > indexed_at + """ + try: + # Try datetime first (user-provided), then modified_at, then created_at, then indexed_at + date_str = ( + doc.metadata.get("datetime") or + doc.metadata.get("modified_at") or + doc.metadata.get("created_at") or + doc.metadata.get("indexed_at") + ) + + if not date_str: + # No temporal information, return neutral score + return 0.5 + + # Parse the date and ensure it's UTC-aware + if 'T' in date_str: + doc_date = datetime.fromisoformat(date_str.replace('Z', '+00:00')) + else: + doc_date = datetime.fromisoformat(date_str) + + # Ensure timezone awareness - assume UTC if naive + if doc_date.tzinfo is None: + doc_date = doc_date.replace(tzinfo=timezone.utc) + + # Calculate age in days using UTC now + now = datetime.now(timezone.utc) + days_old = (now - doc_date).total_seconds() / 86400 + + # Linear decay formula + # Score decreases linearly from 1.0 (today) to 0.0 (temporal_decay_days ago) + temporal_score = max(0.0, min(1.0, (1.0 - days_old / self.temporal_decay_days))) + + return temporal_score + + except Exception as e: + self.logger.warning(f"Error calculating temporal score: {e}") + return 0.5 # Neutral score on error async def rerank( self, query: str, documents: list[Document], top_k: int @@ -39,8 +96,34 @@ async def rerank( output = [] for rerank_res in rerank_result.results: doc = documents[rerank_res.index] - doc.metadata["relevance_score"] = rerank_res.relevance_score + relevance_score = rerank_res.relevance_score + + # Calculate temporal score + temporal_score = self._calculate_temporal_score(doc) + + # Combine relevance and temporal scores + # Final score = (1 - temporal_weight) * relevance + temporal_weight * temporal + combined_score = ( + (1 - self.temporal_weight) * relevance_score + + self.temporal_weight * temporal_score + ) + + # Store all scores in metadata + doc.metadata["relevance_score"] = relevance_score + doc.metadata["temporal_score"] = temporal_score + doc.metadata["combined_score"] = combined_score + output.append(doc) + + # Re-sort by combined score (descending) + output.sort(key=lambda d: d.metadata.get("combined_score", 0), reverse=True) + + self.logger.debug( + "Reranking complete with temporal scoring", + documents_returned=len(output), + temporal_weight=self.temporal_weight, + ) + return output except Exception as e: diff --git a/openrag/components/retriever.py b/openrag/components/retriever.py index 1b675e50..69ba5ae8 100644 --- a/openrag/components/retriever.py +++ b/openrag/components/retriever.py @@ -1,5 +1,6 @@ # Import necessary modules and classes from abc import ABC, abstractmethod +from typing import Optional from components.prompts import HYDE_PROMPT, MULTI_QUERY_PROMPT from langchain_core.documents.base import Document @@ -26,7 +27,12 @@ def __init__( pass @abstractmethod - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve( + self, + partition: list[str], + query: str, + temporal_filter: Optional[dict] = None, + ) -> list[Document]: pass @@ -41,13 +47,37 @@ async def retrieve( self, partition: list[str], query: str, + temporal_filter: Optional[dict] = None, ) -> list[Document]: db = get_vectordb() + + # Build filter with temporal constraints if provided + # Priority: datetime > modified_at > created_at > indexed_at + filter_dict = {} + if temporal_filter: + if "datetime_after" in temporal_filter: + filter_dict["datetime_after"] = temporal_filter["datetime_after"] + if "datetime_before" in temporal_filter: + filter_dict["datetime_before"] = temporal_filter["datetime_before"] + if "created_after" in temporal_filter: + filter_dict["created_after"] = temporal_filter["created_after"] + if "created_before" in temporal_filter: + filter_dict["created_before"] = temporal_filter["created_before"] + if "modified_after" in temporal_filter: + filter_dict["modified_after"] = temporal_filter["modified_after"] + if "modified_before" in temporal_filter: + filter_dict["modified_before"] = temporal_filter["modified_before"] + if "indexed_after" in temporal_filter: + filter_dict["indexed_after"] = temporal_filter["indexed_after"] + if "indexed_before" in temporal_filter: + filter_dict["indexed_before"] = temporal_filter["indexed_before"] + chunks = await db.async_search.remote( query=query, partition=partition, top_k=self.top_k, similarity_threshold=self.similarity_threshold, + filter=filter_dict if filter_dict else None, ) return chunks @@ -79,7 +109,9 @@ def __init__( prompt | llm | StrOutputParser() | (lambda x: x.split("[SEP]")) ) - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve( + self, partition: list[str], query: str, temporal_filter: Optional[dict] = None + ) -> list[Document]: db = get_vectordb() logger.debug("Generating multiple queries", k_queries=self.k_queries) generated_queries = await self.generate_queries.ainvoke( @@ -88,11 +120,34 @@ async def retrieve(self, partition: list[str], query: str) -> list[Document]: "k_queries": self.k_queries, } ) + + # Build filter with temporal constraints if provided + # Priority: datetime > modified_at > created_at > indexed_at + filter_dict = {} + if temporal_filter: + if "datetime_after" in temporal_filter: + filter_dict["datetime_after"] = temporal_filter["datetime_after"] + if "datetime_before" in temporal_filter: + filter_dict["datetime_before"] = temporal_filter["datetime_before"] + if "created_after" in temporal_filter: + filter_dict["created_after"] = temporal_filter["created_after"] + if "created_before" in temporal_filter: + filter_dict["created_before"] = temporal_filter["created_before"] + if "modified_after" in temporal_filter: + filter_dict["modified_after"] = temporal_filter["modified_after"] + if "modified_before" in temporal_filter: + filter_dict["modified_before"] = temporal_filter["modified_before"] + if "indexed_after" in temporal_filter: + filter_dict["indexed_after"] = temporal_filter["indexed_after"] + if "indexed_before" in temporal_filter: + filter_dict["indexed_before"] = temporal_filter["indexed_before"] + chunks = await db.async_multi_query_search.remote( queries=generated_queries, partition=partition, top_k_per_query=self.top_k, similarity_threshold=self.similarity_threshold, + filter=filter_dict if filter_dict else None, ) return chunks @@ -121,18 +176,40 @@ async def get_hyde(self, query: str): hyde_document = await self.hyde_generator.ainvoke({"query": query}) return hyde_document - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve(self, partition: list[str], query: str, temporal_filter: Optional[dict] = None) -> list[Document]: db = get_vectordb() hyde = await self.get_hyde(query) queries = [hyde] if self.combine: queries.append(query) + # Build filter with temporal constraints if provided + # Priority: datetime > modified_at > created_at > indexed_at + filter_dict = {} + if temporal_filter: + if "datetime_after" in temporal_filter: + filter_dict["datetime_after"] = temporal_filter["datetime_after"] + if "datetime_before" in temporal_filter: + filter_dict["datetime_before"] = temporal_filter["datetime_before"] + if "created_after" in temporal_filter: + filter_dict["created_after"] = temporal_filter["created_after"] + if "created_before" in temporal_filter: + filter_dict["created_before"] = temporal_filter["created_before"] + if "modified_after" in temporal_filter: + filter_dict["modified_after"] = temporal_filter["modified_after"] + if "modified_before" in temporal_filter: + filter_dict["modified_before"] = temporal_filter["modified_before"] + if "indexed_after" in temporal_filter: + filter_dict["indexed_after"] = temporal_filter["indexed_after"] + if "indexed_before" in temporal_filter: + filter_dict["indexed_before"] = temporal_filter["indexed_before"] + return await db.async_multi_query_search.remote( queries=queries, partition=partition, top_k_per_query=self.top_k, similarity_threshold=self.similarity_threshold, + filter=filter_dict if filter_dict else None, ) diff --git a/openrag/components/utils.py b/openrag/components/utils.py index 35510e19..e92c9c7e 100644 --- a/openrag/components/utils.py +++ b/openrag/components/utils.py @@ -122,16 +122,36 @@ def format_context(docs: list[Document]) -> str: if not docs: return "No document found from the database" - context = "Extracted documents:\n" + def format_date(iso_date: str) -> str: + """Convert ISO date to readable format: 2025-11-02 14:30""" + try: + from datetime import datetime + dt = datetime.fromisoformat(iso_date.replace('Z', '+00:00')) + return dt.strftime("%Y-%m-%d %H:%M") + except: + return iso_date + + context = "Extracted documents:\n\n" for i, doc in enumerate(docs, start=1): - # doc_id = f"[doc_{i}]" - # document = f""" - # *source*: {doc_id} - # content: \n{doc.page_content.strip()}\n - # """ - document = f"""content: \n{doc.page_content.strip()}\n""" + # Build temporal metadata in a readable format + temporal_parts = [] + if doc.metadata.get("datetime"): + temporal_parts.append(f"Document date: {format_date(doc.metadata['datetime'])}") + elif doc.metadata.get("modified_at"): + temporal_parts.append(f"Last modified: {format_date(doc.metadata['modified_at'])}") + elif doc.metadata.get("created_at"): + temporal_parts.append(f"Created: {format_date(doc.metadata['created_at'])}") + elif doc.metadata.get("indexed_at"): + temporal_parts.append(f"Indexed: {format_date(doc.metadata['indexed_at'])}") + + # Format document with temporal metadata in a natural way + document = f"Document {i}:\n" + if temporal_parts: + document += f"{temporal_parts[0]}\n" + document += f"{doc.page_content.strip()}\n" + context += document - context += "-" * 40 + "\n\n" + context += "-" * 60 + "\n\n" return context diff --git a/openrag/routers/indexer.py b/openrag/routers/indexer.py index cf60d276..f69f6152 100644 --- a/openrag/routers/indexer.py +++ b/openrag/routers/indexer.py @@ -1,5 +1,5 @@ import json -from datetime import datetime +from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional @@ -137,7 +137,14 @@ async def add_file( # Append extra metadata metadata["file_size"] = human_readable_size(file_stat.st_size) - metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() + + # Use provided created_at if available, otherwise extract from file system + if "created_at" not in metadata or not metadata["created_at"]: + metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc).isoformat() + + # Extract file modification time (always from file system) + metadata["modified_at"] = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc).isoformat() + metadata["file_id"] = file_id # Indexing the file @@ -215,7 +222,14 @@ async def put_file( # Append extra metadata metadata["file_size"] = human_readable_size(file_stat.st_size) - metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() + + # Use provided created_at if available, otherwise extract from file system + if "created_at" not in metadata or not metadata["created_at"]: + metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime, tz=timezone.utc).isoformat() + + # Extract file modification time (always from file system) + metadata["modified_at"] = datetime.fromtimestamp(file_stat.st_mtime, tz=timezone.utc).isoformat() + metadata["file_id"] = file_id # Indexing the file diff --git a/openrag/utils/README.md b/openrag/utils/README.md new file mode 100644 index 00000000..40239f32 --- /dev/null +++ b/openrag/utils/README.md @@ -0,0 +1,74 @@ +# TemporalQueryNormalizer + +`TemporalQueryNormalizer` enables **extracting temporal expressions** from user queries and normalizing them into precise UTC date ranges. It supports **multilingual keywords**, universal numeric formats, and ISO dates. + +--- + +## Features + +- Extracts **ISO, numeric, month-year, and year-only dates** globally. +- Supports **relative time expressions** with explicit multilingual units: days, weeks, months, years. +- Recognizes **low-ambiguity multilingual keywords** like "today" and "yesterday". +- Provides **full-day UTC-aligned ranges** for extracted temporal expressions. +- Allows **query augmentation** with temporal context for search or retrieval systems. + +--- + +## Supported Languages + +- English +- Spanish +- French +- German +- Italian +- Portuguese + +> Custom units can be added in `self.time_units` for additional languages. + +--- + +## Usage + +```python +from temporal_query_normalizer import TemporalQueryNormalizer +normalizer = TemporalQueryNormalizer() +query = "Show sales reports from 15/03/2025 to today" +temporal_filter = normalizer.extract_temporal_filter(query) +print(temporal_filter) +#Output: +#{ +#'created_after': '2025-03-15T00:00:00+00:00', +#'created_before': '2025-12-24T23:59:59.999999+00:00' +#} + +augmented_query = normalizer.augment_query(query, temporal_filter) +print(augmented_query) + +#Output: +#"Show sales reports from 15/03/2025 to today (from March 2025 until December 2025)" + +``` + + +--- + +## Supported Formats + +1. **ISO dates**: `2025-03-15`, `2025-03-15T10:30` +2. **Numeric dates**: `15/03/2025`, `03-15-2025`, `15.03.2025` +3. **Month-Year**: `03/2025`, `03-2025` +4. **Year-Month ISO**: `2025/03`, `2025-03` +5. **Year only**: `2025` +6. **Relative units with multilingual support**: + - `7 days`, `7 días`, `7 jours`, `7 tage`, `7 giorni`, `7 dias` + - `2 weeks`, `3 months`, `1 year` +7. **Keywords**: `today`, `yesterday`, `aujourd'hui`, `hier`, `hoy`, `ieri`, `hoje`, `ontem` + +--- + +## Adding Custom Units or Keywords + +```python +normalizer.time_units['fortnight'] = 14 # adds a new unit +normalizer.keyword_ranges['day before yesterday'] = 2 +``` \ No newline at end of file diff --git a/openrag/utils/temporal.py b/openrag/utils/temporal.py new file mode 100644 index 00000000..24d9c414 --- /dev/null +++ b/openrag/utils/temporal.py @@ -0,0 +1,670 @@ +import re +import unicodedata +import calendar +from datetime import datetime, timedelta, timezone +from typing import Optional, Dict, Tuple, Callable, List, Pattern +import dateparser + +class TemporalQueryNormalizer: + """ + Extracts temporal expressions from queries and normalizes them to date ranges. + + Design principles: + - Deterministic (no guessing from bare numbers) + - Multilingual via explicit unit mappings + - Language-agnostic for numeric / ISO dates + - Full-day UTC-aligned ranges + """ + + def __init__(self, prefer_dd_mm: bool = True): + # Ordered: higher precision first. Compile regexes once to avoid repeated + # compilation on every query which can be costly for long queries. + self.universal_patterns: List[Tuple[Pattern, Callable]] = [ + # ISO date: 2024-01-15 or 2024-01-15T10:30 + (re.compile(r'(\d{4})-(\d{2})-(\d{2})(?:T[\d:]+)?'), self._parse_iso_date), + + # Day + month name, e.g. '5 Jan' or '5 janvier' (month name parsing uses dateparser if available) + (re.compile(r"\b(\d{1,2})\s+([A-Za-zÀ-ÖØ-öø-ÿ\-']+)(?:\s+(\d{4}))?\b"), self._parse_day_month_name), + + # Numeric dates: 15/01/2024, 01-15-2024, 15.01.2024 + (re.compile(r'\b(\d{1,2})[\/\.\-](\d{1,2})[\/\.\-](\d{4})\b'), self._parse_numeric_date), + + # Month-year: 01/2024 or 01-2024 + (re.compile(r'\b(\d{1,2})[\/\-](\d{4})\b'), self._parse_month_year), + + # Year-month: 2024/01 or 2024-01 + (re.compile(r'\b(\d{4})[\/\-](\d{1,2})\b'), self._parse_year_month), + + # Year only + (re.compile(r'\b(20\d{2})\b'), self._parse_year_only), + ] + + # Explicit multilingual time units (no guessing) + self.time_units = { + # days + "day": 1, "days": 1, + "jour": 1, "jours": 1, + "día": 1, "días": 1, + "tag": 1, "tage": 1, + "giorno": 1, "giorni": 1, + "dia": 1, "dias": 1, + + # weeks + "week": 7, "weeks": 7, + "semaine": 7, "semaines": 7, + "semana": 7, "semanas": 7, + "woche": 7, "wochen": 7, + + # months (approximate) + "month": 30, "months": 30, + "mois": 30, + "mes": 30, "meses": 30, + "monat": 30, "monate": 30, + + # years + "year": 365, "years": 365, + "an": 365, "ans": 365, + "año": 365, "años": 365, + "jahr": 365, "jahre": 365, + } + + self.relative_prefix_words = [ + # English + "last", "past", "previous", "in the last", "in the past", "over the past", + "within the past", "during the past", "for the past", "over the last", "in last", + + # Spanish + "hace", "últimos", "últimas", "último", "última", + + # French + "il y a", "derniers", "dernières", "dernier", "dernière", + + # German + "vor", "letzten", "letzte", "letztes", + + # Portuguese + "há", "ultimos", "últimos", "ultimas", "últimas", + + # Italian + "fa", "ultimi", "ultime", "ultimo", "ultima", + ] + + self.relative_suffix_words = [ + # English + "ago", + + # Spanish/Portuguese + "atrás", + + # Italian + "fa", + ] + # Build regexes from folded (accent-stripped) forms so accents are optional + def _fold_list(words): + folded = [] + for w in words: + f = unicodedata.normalize('NFD', w) + f = ''.join(ch for ch in f if not unicodedata.combining(ch)) + folded.append(f) + return folded + + folded_prefixes = _fold_list(self.relative_prefix_words) + folded_suffixes = _fold_list(self.relative_suffix_words) + + # Accept either digits or simple hyphenated/word number tokens (e.g. "two", "twenty-one") + number_token = r'([0-9]+|[A-Za-z-]+)' + unit_token = r'(\w+)' + + prefix_re = r"\b(?:" + "|".join(re.escape(p) for p in folded_prefixes) + r")\b\s*" + number_token + r"\s*" + unit_token + suffix_re = r"\b" + number_token + r"\s*" + unit_token + r"\s*(?:" + "|".join(re.escape(s) for s in folded_suffixes) + r")\b" + + self.relative_prefix_pattern = re.compile(prefix_re, re.IGNORECASE) + self.relative_suffix_pattern = re.compile(suffix_re, re.IGNORECASE) + + # Build a normalized time_units mapping (folded, lowercase) and add simple + # plural/singular variants to handle common pluralization across languages. + self.normalized_time_units = {} + for k, v in self.time_units.items(): + fk = unicodedata.normalize('NFD', k) + fk = ''.join(ch for ch in fk if not unicodedata.combining(ch)).lower() + # base form + self.normalized_time_units[fk] = v + # add simple plural (append 's') if not already + if not fk.endswith('s'): + self.normalized_time_units[fk + 's'] = v + # if it ends with 's', add singular by removing trailing 's' + else: + singular = fk[:-1] + if singular: + self.normalized_time_units[singular] = v + # handle common suffixes: 'es' -> remove, 'ies' -> y + if fk.endswith('es') and len(fk) > 2: + self.normalized_time_units[fk[:-2]] = v + if fk.endswith('ies') and len(fk) > 3: + self.normalized_time_units[fk[:-3] + 'y'] = v + + # Low-ambiguity multilingual keywords + self.keyword_ranges = { + "today": 0, + "aujourd'hui": 0, + "heute": 0, + "hoy": 0, + "oggi": 0, + "hoje": 0, + + "yesterday": 1, + "hier": 1, + "ayer": 1, + "ieri": 1, + "ontem": 1, + } + # Preference for interpreting ambiguous numeric dates like 01/02/2024: + # - True: prefer DD/MM (day=01, month=02) + # - False: prefer MM/DD (month=01, day=02) + self.prefer_dd_mm = prefer_dd_mm + + # -------------------- Parsing helpers -------------------- + + def _parse_iso_date(self, match): + y, m, d = map(int, match.groups()[:3]) + return self._specific_date(y, m, d) + + def _parse_numeric_date(self, match): + a, b, y = map(int, match.groups()) + + # Disambiguate ambiguous numeric dates according to preference. + # If prefer_dd_mm is True, interpret as DD/MM/YYYY when possible. + if self.prefer_dd_mm: + # Prefer DD/MM if the second group is a valid month + if 1 <= b <= 12 and 1 <= a <= 31: + return self._specific_date(y, b, a) + # Fallback to MM/DD if the first group looks like a month + if 1 <= a <= 12 and 1 <= b <= 31: + return self._specific_date(y, a, b) + else: + # Prefer MM/DD if the first group is a valid month + if 1 <= a <= 12 and 1 <= b <= 31: + return self._specific_date(y, a, b) + # Fallback to DD/MM + if 1 <= b <= 12 and 1 <= a <= 31: + return self._specific_date(y, b, a) + + raise ValueError + + def _parse_month_year(self, match): + m, y = map(int, match.groups()) + return self._month_range(y, m) + + def _parse_year_month(self, match): + y, m = map(int, match.groups()) + return self._month_range(y, m) + + def _parse_year_only(self, match): + return self._year_range(int(match.group(1))) + + def _parse_day_month_name(self, match): + # match groups: day, monthname, optional year + day = int(match.group(1)) + month_token = match.group(2) + year = None + if match.group(3): + year = int(match.group(3)) + else: + year = datetime.now(timezone.utc).year + + month = self._month_name_to_int(month_token) + if month is None: + raise ValueError + return self._specific_date(year, month, day) + + def _month_name_to_int(self, token: str) -> Optional[int]: + """Convert a month name (multilingual) to its month number (1-12). + + Tries `dateparser` if available, otherwise falls back to a small + multilingual lookup using folded month names. + """ + if not token: + return None + folded = unicodedata.normalize('NFD', token) + folded = ''.join(ch for ch in folded if not unicodedata.combining(ch)).lower() + + try: + dt = dateparser.parse(f"1 {token} 2000") + if dt and isinstance(dt, datetime): + return dt.month + except Exception: + pass + + # fallback small multilingual map (folded forms) + months = { + 'january': 1, 'february': 2, 'march': 3, 'april': 4, 'may': 5, 'june': 6, + 'july': 7, 'august': 8, 'september': 9, 'october': 10, 'november': 11, 'december': 12, + # Spanish + 'enero': 1, 'febrero': 2, 'marzo': 3, 'abril': 4, 'mayo': 5, 'junio': 6, + 'julio': 7, 'agosto': 8, 'septiembre': 9, 'octubre': 10, 'noviembre': 11, 'diciembre': 12, + # French (folded) + 'janvier': 1, 'fevrier': 2, 'mars': 3, 'avril': 4, 'mai': 5, 'juin': 6, + 'juillet': 7, 'aout': 8, 'septembre': 9, 'octobre': 10, 'novembre': 11, 'decembre': 12, + # German (folded) + 'januar': 1, 'februar': 2, 'marz': 3, 'april': 4, 'mai': 5, 'juni': 6, + 'juli': 7, 'august': 8, 'september': 9, 'oktober': 10, 'november': 11, 'dezember': 12, + # Italian + 'gennaio': 1, 'febbraio': 2, 'marzo': 3, 'aprile': 4, 'maggio': 5, 'giugno': 6, + 'luglio': 7, 'agosto': 8, 'settembre': 9, 'ottobre': 10, 'novembre': 11, 'dicembre': 12, + # Portuguese + 'janeiro': 1, 'fevereiro': 2, 'marco': 3, 'abril': 4, 'maio': 5, 'junho': 6, + 'julho': 7, 'agosto': 8, 'setembro': 9, 'outubro': 10, 'novembro': 11, 'dezembro': 12, + } + + # accept short forms + if len(folded) <= 4: + # try month abbreviations in english and folded languages + abbrev_map = {k[:3]: v for k, v in months.items()} + if folded in abbrev_map: + return abbrev_map[folded] + + return months.get(folded) + + # -------------------- Range builders -------------------- + + def _specific_date(self, year: int, month: int, day: int): + start = datetime(year, month, day, tzinfo=timezone.utc) + end = start.replace(hour=23, minute=59, second=59, microsecond=999999) + return start, end + + def _month_range(self, year: int, month: int): + start = datetime(year, month, 1, tzinfo=timezone.utc) + # use calendar.monthrange to get the last day of the month reliably + last_day = calendar.monthrange(year, month)[1] + end = datetime(year, month, last_day, 23, 59, 59, 999999, tzinfo=timezone.utc) + return start, end + + def _year_range(self, year: int): + start = datetime(year, 1, 1, tzinfo=timezone.utc) + end = datetime(year, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) + return start, end + + def _last_n_days(self, days: int): + end = datetime.now(timezone.utc) + start = end - timedelta(days=days) + return start, end + + # -------------------- Extraction logic -------------------- + + def _extract_relative(self, query: str): + # Fold accents in the query so accents are optional in user input + folded_query = unicodedata.normalize('NFD', query) + folded_query = ''.join(ch for ch in folded_query if not unicodedata.combining(ch)) + + # Prefer explicit contextual prefixes like "last", "past", etc. + m = self.relative_prefix_pattern.search(folded_query) + if m: + raw_value = m.group(1) + unit = m.group(2) + # fold unit + uf = unicodedata.normalize('NFD', unit) + uf = ''.join(ch for ch in uf if not unicodedata.combining(ch)).lower() + value = None + if raw_value.isdigit(): + value = int(raw_value) + else: + value = self._word_to_int(raw_value) + if value is None: + return None + if uf in self.normalized_time_units: + days = value * self.normalized_time_units[uf] + return self._last_n_days(days) + + # Also accept suffixes like "5 years ago" + m = self.relative_suffix_pattern.search(folded_query) + if m: + raw_value = m.group(1) + unit = m.group(2) + uf = unicodedata.normalize('NFD', unit) + uf = ''.join(ch for ch in uf if not unicodedata.combining(ch)).lower() + value = None + if raw_value.isdigit(): + value = int(raw_value) + else: + value = self._word_to_int(raw_value) + if value is None: + return None + if uf in self.normalized_time_units: + days = value * self.normalized_time_units[uf] + return self._last_n_days(days) + + return None + + def _extract_keywords(self, query: str): + q = unicodedata.normalize('NFD', query) + q = ''.join(ch for ch in q if not unicodedata.combining(ch)).lower() + for word, offset in self.keyword_ranges.items(): + # fold keyword before searching + kw = unicodedata.normalize('NFD', word) + kw = ''.join(ch for ch in kw if not unicodedata.combining(ch)).lower() + if kw in q: + day = datetime.now(timezone.utc) - timedelta(days=offset) + start = day.replace(hour=0, minute=0, second=0, microsecond=0) + end = day.replace(hour=23, minute=59, second=59, microsecond=999999) + return start, end + return None + + def _word_to_int(self, token: str) -> Optional[int]: + """Convert a (folded) number-word or hyphenated number-word to an int. + + Supports small numbers in several languages (common words up to 20 and tens). + Returns None if unknown. + """ + t = token.lower() + # normalize hyphens/spaces + t = t.replace('\u2013', '-') + t = t.replace('\u2014', '-') + t = t.replace('–', '-') + t = t.replace('—', '-') + + # simple vocab mapping (folded words) + mapping = { + # English + 'zero': 0, 'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5, + 'six': 6, 'seven': 7, 'eight': 8, 'nine': 9, 'ten': 10, + 'eleven': 11, 'twelve': 12, 'thirteen': 13, 'fourteen': 14, 'fifteen': 15, + 'sixteen': 16, 'seventeen': 17, 'eighteen': 18, 'nineteen': 19, 'twenty': 20, + 'thirty': 30, 'forty': 40, 'fifty': 50, 'sixty': 60, 'seventy': 70, 'eighty': 80, 'ninety': 90, + 'a': 1, 'an': 1, 'couple': 2, + + # Spanish + 'uno': 1, 'dos': 2, 'tres': 3, 'cuatro': 4, 'cinco': 5, 'seis': 6, 'siete': 7, 'ocho': 8, 'nueve': 9, 'diez': 10, + 'once': 11, 'doce': 12, 'trece': 13, 'catorce': 14, 'quince': 15, 'dieciseis': 16, 'diecisiete': 17, 'dieciocho': 18, 'diecinueve': 19, 'veinte': 20, + + # French (folded) + 'un': 1, 'deux': 2, 'trois': 3, 'quatre': 4, 'cinq': 5, 'six': 6, 'sept': 7, 'huit': 8, 'neuf': 9, 'dix': 10, + 'onze': 11, 'douze': 12, 'treize': 13, 'quatorze': 14, 'quinze': 15, 'seize': 16, 'vingt': 20, + + # German + 'eins': 1, 'zwei': 2, 'drei': 3, 'vier': 4, 'funf': 5, 'sechs': 6, 'sieben': 7, 'acht': 8, 'neun': 9, 'zehn': 10, + 'elf': 11, 'zwolf': 12, 'dreizehn': 13, 'vierzehn': 14, 'funfzehn': 15, 'sechzehn': 16, 'siebzehn': 17, 'achtzehn': 18, 'neunzehn': 19, 'zwanzig': 20, + + # Italian + 'uno': 1, 'due': 2, 'tre': 3, 'quattro': 4, 'cinque': 5, 'sei': 6, 'sette': 7, 'otto': 8, 'nove': 9, 'dieci': 10, + + # Portuguese + 'um': 1, 'dois': 2, 'tres': 3, 'quatro': 4, 'cinco': 5, 'seis': 6, 'sete': 7, 'oito': 8, 'nove': 9, 'dez': 10, + } + + # scale words mapping (folded) + scales = { + # English + 'hundred': 100, 'thousand': 1000, 'million': 1000000, + # Spanish + 'cien': 100, 'ciento': 100, 'mil': 1000, 'millon': 1000000, 'millones': 1000000, + # French + 'cent': 100, 'mille': 1000, 'million': 1000000, 'millions': 1000000, + # German + 'hundert': 100, 'tausend': 1000, 'million': 1000000, + # Italian + 'cento': 100, 'mille': 1000, 'milione': 1000000, 'milioni': 1000000, + # Portuguese + 'cem': 100, 'cento': 100, 'mil': 1000, 'milhao': 1000000, 'milhoes': 1000000, + } + + # quick exact match + if t in mapping: + return mapping[t] + + # split into tokens (spaces and hyphens) + parts = re.split('[-\s]+', t) + if not parts: + return None + + total = 0 + current = 0 + any_matched = False + + for p in parts: + if not p or p == 'and': + continue + any_matched = True + if p in mapping: + current += mapping[p] + continue + if p in scales: + scale = scales[p] + if current == 0: + current = 1 + current *= scale + # for thousand/million we add to total and reset current + if scale >= 1000: + total += current + current = 0 + continue + # unknown token -> cannot parse + return None + + result = total + current + if any_matched and result > 0: + return result + return None + + def _extract_explicit_date_range(self, query: str) -> Optional[Tuple[datetime, datetime]]: + """Find two explicit dates in the query and return them as a range if + they are separated by a connector like 'to'/'until' or preceded by 'from'. + Uses the compiled universal patterns to find date tokens. + """ + q = query + # collect matches from all universal patterns + matches = [] # list of (start_idx, end_idx, (start_dt, end_dt)) + for pattern, handler in self.universal_patterns: + for m in pattern.finditer(q): + try: + start_dt, end_dt = handler(m) + except Exception: + continue + matches.append((m.start(), m.end(), start_dt, end_dt)) + + if not matches: + return None + + # sort and filter overlapping, keep earliest non-overlapping matches + matches.sort(key=lambda x: x[0]) + filtered = [] + last_end = -1 + for sidx, eidx, sd, ed in matches: + if sidx >= last_end: + filtered.append((sidx, eidx, sd, ed)) + last_end = eidx + + if len(filtered) < 2: + return None + + # check pairwise for connectors between consecutive date matches + connectors = ["to", "until", "through", "-", "–", "and"] + folded = unicodedata.normalize('NFD', q) + folded = ''.join(ch for ch in folded if not unicodedata.combining(ch)).lower() + + ranges = [] + for i in range(len(filtered) - 1): + s1, e1, sd1, ed1 = filtered[i] + s2, e2, sd2, ed2 = filtered[i + 1] + + between = folded[e1:s2].strip() + # quick connector check + has_connector = any(c in between for c in connectors) + # or 'from' before first date + before = folded[max(0, s1 - 10):s1] + has_from = 'from' in before or 'between' in before + + if has_connector or has_from: + ranges.append((sd1, ed2)) + + if not ranges: + return None + + # return the first range (backwards-compatible single-range helper) + return ranges[0] + + def _extract_explicit_date_ranges(self, query: str) -> List[Tuple[datetime, datetime]]: + """Return all explicit date ranges found in the query as a list of + (start_dt, end_dt) tuples. This scans for adjacent date matches with + connectors and returns every pair that forms a range. + """ + q = query + matches = [] + for pattern, handler in self.universal_patterns: + for m in pattern.finditer(q): + try: + start_dt, end_dt = handler(m) + except Exception: + continue + matches.append((m.start(), m.end(), start_dt, end_dt)) + + if not matches: + return [] + + matches.sort(key=lambda x: x[0]) + filtered = [] + last_end = -1 + for sidx, eidx, sd, ed in matches: + if sidx >= last_end: + filtered.append((sidx, eidx, sd, ed)) + last_end = eidx + + if len(filtered) < 2: + return [] + + connectors = ["to", "until", "through", "-", "–", "and"] + folded = unicodedata.normalize('NFD', q) + folded = ''.join(ch for ch in folded if not unicodedata.combining(ch)).lower() + + ranges = [] + for i in range(len(filtered) - 1): + s1, e1, sd1, ed1 = filtered[i] + s2, e2, sd2, ed2 = filtered[i + 1] + between = folded[e1:s2].strip() + has_connector = any(c in between for c in connectors) + before = folded[max(0, s1 - 10):s1] + has_from = 'from' in before or 'between' in before + if has_connector or has_from: + ranges.append((sd1, ed2)) + + return ranges + + # -------------------- Public API -------------------- + + def extract_temporal_filter(self, query: str) -> Optional[Dict[str, str]]: + # Prefer explicit ranges; collect all ranges but keep backward compatibility + # by returning only the first range from this function. Use + # `extract_temporal_filters` to get all ranges. + explicit_ranges = self._extract_explicit_date_ranges(query) + if explicit_ranges: + start, end = explicit_ranges[0] + return { + "created_after": start.isoformat(), + "created_before": end.isoformat(), + } + + for pattern, handler in self.universal_patterns: + # `pattern` is a compiled regex Pattern; use its search() method. + match = pattern.search(query) + if match: + try: + start, end = handler(match) + return { + "created_after": start.isoformat(), + "created_before": end.isoformat(), + } + except ValueError: + pass + + relative = self._extract_relative(query) + if relative: + start, end = relative + return { + "created_after": start.isoformat(), + "created_before": end.isoformat(), + } + + keyword = self._extract_keywords(query) + if keyword: + start, end = keyword + return { + "created_after": start.isoformat(), + "created_before": end.isoformat(), + } + + return None + + def extract_temporal_filters(self, query: str) -> List[Dict[str, str]]: + """Return all temporal ranges found in `query` as a list of dicts with + `created_after`/`created_before` ISO strings. This includes explicit + date ranges and relative/keyword ranges where appropriate. + """ + results: List[Dict[str, str]] = [] + + # explicit ranges first + explicit_ranges = self._extract_explicit_date_ranges(query) + for s, e in explicit_ranges: + results.append({"created_after": s.isoformat(), "created_before": e.isoformat()}) + + # keep previous single-match behavior for numeric/universal patterns + # only if no explicit ranges found + if not results: + for pattern, handler in self.universal_patterns: + match = pattern.search(query) + if match: + try: + s, e = handler(match) + results.append({"created_after": s.isoformat(), "created_before": e.isoformat()}) + break + except ValueError: + pass + + # relative and keyword ranges (only add if none found) + if not results: + relative = self._extract_relative(query) + if relative: + s, e = relative + results.append({"created_after": s.isoformat(), "created_before": e.isoformat()}) + else: + keyword = self._extract_keywords(query) + if keyword: + s, e = keyword + results.append({"created_after": s.isoformat(), "created_before": e.isoformat()}) + + return results + + def augment_query(self, query: str, temporal_filter: Optional[Dict[str, str]] = None) -> str: + if temporal_filter is None: + temporal_filter = self.extract_temporal_filter(query) + + if not temporal_filter: + return query + + try: + parts = [] + after = before = None + + if "created_after" in temporal_filter: + after = datetime.fromisoformat(temporal_filter["created_after"]) + if "created_before" in temporal_filter: + before = datetime.fromisoformat(temporal_filter["created_before"]) + + # Check if it's a single day (start and end on same date) + # Use portable day formatting: avoid %-d (not supported on Windows) + def _fmt(dt: datetime) -> str: + return dt.strftime('%d %B %Y').lstrip('0') + + if after and before and after.date() == before.date(): + parts.append(f"on {_fmt(after)}") + else: + if after: + parts.append(f"from {_fmt(after)}") + if before: + parts.append(f"until {_fmt(before)}") + + return f"{query} ({' '.join(parts)})" if parts else query + except (ValueError, KeyError): + # Invalid temporal filter, return original query + return query + + + return f"{query} ({' '.join(parts)})" diff --git a/prompts/example1/sys_prompt_tmpl.txt b/prompts/example1/sys_prompt_tmpl.txt index 81d472bb..3b40a8a1 100644 --- a/prompts/example1/sys_prompt_tmpl.txt +++ b/prompts/example1/sys_prompt_tmpl.txt @@ -2,6 +2,8 @@ You are an AI conversational assistant specialized in **information retrieval an Your goal is to provide **precise, reliable, and well-structured answers** using **only the retrieved documents** (`Context`). Prioritize **clarity, accuracy, and completeness** in your responses. +**Current Date and Time:** {current_datetime} + ## Rules 1. Use only the provided Context @@ -10,10 +12,17 @@ Prioritize **clarity, accuracy, and completeness** in your responses. * If the context is **insufficient**, **invite the user** to clarify their query or provide additional keywords. 2. Language Consistency - * Always respond **in the same language** as the user’s query. + * Always respond **in the same language** as the user's query. 3. Structure and Readability * Use **headings**, **bullet points**, **numbered lists**, or **tables** to organize information clearly. * Ensure responses are **concise yet complete**, avoiding omission of key details. +4. Temporal Awareness + * Pay attention to the **temporal context** of both the query and the retrieved documents. + * Each document includes **creation_date** and **indexed_date** metadata indicating when it was created and indexed. + * When the query includes temporal references (e.g., "last week", "yesterday", "this year"), prioritize documents with **more recent dates**. + * If documents contain conflicting information, prefer **newer documents** unless the user explicitly requests historical information. + * When appropriate, mention the **date or timeframe** of the information you're presenting to help users understand its recency. + Here are the retrieved documents: `{context}` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index e097ba0c..e8f8b95f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -45,6 +45,7 @@ dependencies = [ "pytest-env>=1.1.5", "markitdown[docx]>=0.1.3", "html-to-markdown>=2.4.0", + "dateparser>=1.2.2", ] [dependency-groups]