diff --git a/.hydra_config/config.yaml b/.hydra_config/config.yaml index 1cb693e5..6b975bf6 100644 --- a/.hydra_config/config.yaml +++ b/.hydra_config/config.yaml @@ -40,6 +40,7 @@ vectordb: collection_name: ${oc.env:VDB_COLLECTION_NAME, vdb_test} hybrid_search: ${oc.env:VDB_HYBRID_SEARCH, true} enable: true + schema_version: 1 # Increment when the collection schema changes and a migration is required rdb: host: ${oc.env:POSTGRES_HOST, rdb} diff --git a/docs/content/docs/documentation/API.mdx b/docs/content/docs/documentation/API.mdx index 7174560a..2a88c1f2 100644 --- a/docs/content/docs/documentation/API.mdx +++ b/docs/content/docs/documentation/API.mdx @@ -78,6 +78,18 @@ Upload a new file to a specific partition for indexing. - `201 Created`: Returns task status URL - `409 Conflict`: File already exists in partition +##### Temporal Filtering +OpenRAG supports temporal filtering to retrieve documents from specific time periods. +The client can include the temporal field to allow temporal-aware search in search endpoints. + +* `created_at`: ISO 8601 format date of when the file was created + +:::info +`created_at` is provided by the client in the metadata of the file during upload. +This is a first iteration — additional temporal fields (e.g. `updated_at`) may be added in future releases as needed. +::: + + ##### Upload files while modeling relations between them OpenRAG supports document relationships to enable context-aware retrieval. @@ -202,6 +214,8 @@ Perform semantic search across specified partitions. | `include_related` (optional) | boolean | `false` | Include chunks from files with same `relationship_id` | | `include_ancestors` (optional) | boolean | `false` | Include chunks from ancestor files (via `parent_id` chain) | | `related_limit` (optional) | integer | 20 | Max related/ancestor chunks to fetch per result (used when `include_related` or `include_ancestors` is true) | +| `filter` (optional) | string | None | Milvus filter expression string for additional filtering (optional). Supports comparison, range, and logical operators. | +| `filter_params` (optional) | string (JSON) | None | JSON-encoded dictionary of parameter values for templated filters (URL-encode the JSON). | **Responses:** - `200 OK`: JSON list of document links (HATEOAS format) @@ -223,6 +237,8 @@ Search within a specific partition only. | `include_related` (optional) | boolean | `false` | Include chunks from files with same `relationship_id` | | `include_ancestors` (optional) | boolean | `false` | Include chunks from ancestor files (via `parent_id` chain) | | `related_limit` (optional) | integer | 20 | Max related/ancestor chunks to fetch per result (used when `include_related` or `include_ancestors` is true) | +| `filter` (optional) | string | None | Milvus filter expression string for additional filtering (optional). Supports comparison, range, and logical operators. | +| `filter_params` (optional) | string (JSON) | None | JSON-encoded dictionary of parameter values for templated filters (URL-encode the JSON). | **Response:** Same as multi-partition search @@ -233,7 +249,7 @@ GET /search/partition/{partition}/file/{file_id} Search within a particular file in a partition. -**Query Parameters:** Same as partition search +**Query Parameters:** Same as partition search, including `filter` and `filter_params`. **Response:** Same as other search endpoints --- diff --git a/docs/content/docs/documentation/milvus_migration.md b/docs/content/docs/documentation/milvus_migration.md new file mode 100644 index 00000000..b5015f60 --- /dev/null +++ b/docs/content/docs/documentation/milvus_migration.md @@ -0,0 +1,176 @@ +--- +title: Milvus Migrations +--- + +# Milvus Upgrade +OpenRAG has been upgraded from Milvus **2.5.4** to **2.6.11** to leverage the enhancements introduced in the latest releases, particularly the new temporal querying capabilities added in version **2.6.6+**. + +## What's New in 2.6.x + +Milvus 2.6.6+ introduced the **`TIMESTAMPTZ`** field type, which enables: + +- **Comparison and range filtering** using standard operators (`=`, `!=`, `<`, `>`, etc.) +- **Interval arithmetic** — add or subtract durations (days, hours, minutes) directly in filter expressions +- **Time-based indexing** for faster temporal queries +- **Combined filtering** — pair timestamp conditions with vector similarity search + +**Example — basic comparison:** +```python +expr = "tsz != ISO '2025-01-03T00:00:00+08:00'" +results = client.query( + collection_name, + filter=expr, + output_fields=["id", "tsz"], + limit=10 +) +``` + +**Example — interval arithmetic:** +```python +expr = "tsz + INTERVAL 'P1D' > ISO '2025-01-03T00:00:00+08:00'" +results = client.query( + collection_name, + filter=expr, + output_fields=["id", "tsz"], + limit=10 +) +``` + +> `INTERVAL` values follow [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) syntax: +> * `P1D` = 1 day +> * `PT3H` = 3 hours +> * `P2DT6H` = 2 days and 6 hours. + +## Milvus version upgrade Steps +:::danger[Before running Milvus Version Migration] +These steps must be performed on a deployment running OpenRAG **prior to version 1.1.6** (Milvus 2.5.4) before switching to the newest version of OpenRAG. +::: + +> For the full official reference, see the [Milvus upgrade guide](https://milvus.io/docs/upgrade_milvus_standalone-docker.md#Upgrade-process). + +### Step 1 — Upgrade to Milvus 2.5.16 first + +Milvus requires an intermediate upgrade to **v2.5.16** before jumping to 2.6.x. + +Edit `vdb/milvus.yaml` and set the Milvus image tag: + +```diff lang=yaml +// vdb/milvus.yaml +milvus: +- image: milvusdb/milvus:v2.5.4 ++ image: milvusdb/milvus:v2.5.16 # Migrate to milvus 2.5.16 +``` + +Then restart the stack: + +```bash +docker compose down +docker compose up --build milvus -d +``` + +Wait for all services to be healthy before continuing. + +### Step 2 — Upgrade to Milvus 2.6.11 + +Update `vdb/milvus.yaml` with the target versions (MinIO must also be updated for compatibility): + +```diff lang=yaml +// vdb/milvus.yaml +minio: +- image: minio/minio:RELEASE.2023-03-20T20-16-18Z ++ image: minio/minio:RELEASE.2024-12-18T13-15-44Z + +... +milvus: +- image: milvusdb/milvus:v2.5.16 ++ image: milvusdb/milvus:v2.6.11 +``` + +### Step 3 — Stop all services + +```bash +docker compose down +``` + +Verify that all containers are stopped before proceeding: + +```bash +docker ps | grep milvus +``` + +### Step 4 — Start with the new image + +```bash +docker compose up -d +``` + +Once healthy, confirm the running version: + +```bash +docker inspect milvus-standalone --format '{{ .Config.Image }}' +# Expected: milvusdb/milvus:v2.6.11 +``` + +Now you can switch to the newest release of OpenRAG and it should work fine. + +## Schema Migration — Add Temporal Fields + +:::info +This migration adds a `TIMESTAMPTZ` fields `created_at` and a `STL_SORT` index to an existing collection. + +Existing documents will have `null` for that field; new documents will have them populated at index time. +::: + +:::danger[OpenRAG must be stopped] +Stop the OpenRAG application before running this migration. +::: + +### Step 1 — Start only the Milvus container + +```bash +docker compose up -d milvus +``` + +Wait until Milvus is healthy: + +```bash +docker compose ps milvus +``` + +### Step 2 — Dry-run (inspect, no changes) + +```bash +docker compose run --no-deps --rm --build --entrypoint "" openrag \ + uv run python scripts/migrations/milvus/1.add_temporal_fields.py --dry-run +``` + +Review the output to confirm which fields and indexes are missing. + +### Step 3 — Apply the migration + +```bash +docker compose run --no-deps --rm --build --entrypoint "" openrag \ + uv run python scripts/migrations/milvus/1.add_temporal_fields.py +``` + +The script will: +1. Add any missing `TIMESTAMPTZ` fields (nullable) +2. Create `STL_SORT` indexes for each field +3. Stamp the collection with `schema_version=1` so OpenRAG no longer reports a migration error on startup + +### Step 4 — Restart OpenRAG + +```bash +docker compose up --build -d +``` + +### Rollback + +Milvus does not yet support dropping fields. The rollback only removes the indexes and resets the version stamp — the fields remain in the schema but are unused: + +```bash +docker compose run --no-deps --rm --build --entrypoint "" openrag \ + uv run python scripts/migrations/milvus/1.add_temporal_fields.py --downgrade +``` + +To fully remove the fields you would need to recreate the collection from scratch. \ No newline at end of file diff --git a/docs/content/docs/documentation/temporality.md b/docs/content/docs/documentation/temporality.md new file mode 100644 index 00000000..6a749629 --- /dev/null +++ b/docs/content/docs/documentation/temporality.md @@ -0,0 +1,138 @@ +--- +title: Temporality +--- + +# Milvus representation + +* As scalar field + +Scalar fields store primitive, structured values—commonly referred to as metadata—such as numbers, strings, or dates. + +They allow you to narrow search results based on specific attributes, like limiting documents to a particular category or a defined **time range**. + + * You can set nullable=True for TIMESTAMPTZ fields to allow missing values. + * You can specify a default timestamp value using the default_value attribute in ISO 8601 format. + +* format: timestamp (ISO 8601 format) + * All temporal fields are stored in ISO 8601 format + +* **Automatic date extraction** + +# Operation +## Add a TIMESTAMPTZ field that allows null values +* schema.add_field("tsz", DataType.TIMESTAMPTZ, nullable=True) +* You can specify a default timestamp value using the **`default_value`** attribute in **`ISO 8601` format**. + + +## Filtering operations + +Compatible with milvus 2.6.6 + +* **`TIMESTAMPTZ`** supports scalar comparisons, interval arithmetic, and extraction of time components. + +* **Comparison and filtering**: All filtering and ordering operations are performed in UTC, ensuring consistent and predictable results across different time zones. + +* Query with timestamp filtering + * Use arithmetic operators like ==, !=, <, >, <=, >=. For a full list of arithmetic operators available in Milvus, refer to [Arithmetic Operators](https://milvus.io/docs/basic-operators.md#Arithmetic-Operators) + + * timestamp filtering + + ```python + expr = "tsz != ISO '2025-01-03T00:00:00+08:00'" + + results = client.query( + collection_name=collection_name, + filter=expr, + output_fields=["id", "tsz"], + limit=10 + ) + + print("Query result: ", results) + ``` + + * Interval operations + * You can perform arithmetic on TIMESTAMPTZ fields using INTERVAL values in the ISO 8601 duration format. This allows you to add or subtract durations, such as days, hours, or minutes, from a timestamp when filtering data. + + ```python + expr = "tsz + INTERVAL 'P0D' != ISO '2025-01-03T00:00:00+08:00'" + + results = client.query( + collection_name, + filter=expr, + output_fields=["id", "tsz"], + limit=10 + ) + + print("Query result: ", results) + ``` + + * **`INTERVAL`** values follow the **`ISO 8601` duration** syntax. For example: + * P1D → 1 day + * PT3H → 3 hours + * P2DT6H → 2 days and 6 hours + + * You can use **`INTERVAL`** arithmetic directly in filter expressions, such as: + * tsz + INTERVAL 'P3D' → Adds 3 days + * tsz - INTERVAL 'PT2H' → Subtracts 2 hours + + * Search with timestamp filtering + * You can combine **`TIMESTAMPTZ`** filtering with vector similarity search to narrow results by both time and similarity. + + + +-------- + +* Migration from Milvus v2.5.4 to v2.6.11 + * TIMESTAMPTZ is compatible with Milvus 2.6.6+ + + * Migration according to the release notes for Milvus Standalone: https://milvus.io/docs/upgrade_milvus_standalone-docker.md + * `You must upgrade to v2.5.16 or later before upgrading to v2.6.11.` + + * Steps for upgrading: https://milvus.io/docs/upgrade_milvus_standalone-docker.md#Upgrade-process + + * Issue: I've moved from Milvs 2.5.4 to 2.6.11 following https://milvus.io/docs/upgrade_milvus_standalone-docker.md. Previous collections created in 2.5.4 can't be loaded. It runs forever. + + * https://github.com/milvus-io/milvus/issues/43295 + + * https://www.perplexity.ai/search/i-ve-moved-from-milvs-2-5-4-to-CDHCle5hQl.qsUa_nw4WHQ + + + + +* Done successfully + +----- + +* Setting "datatype=DataType.TIMESTAMPTZ" datatype for the field created_at + +* Search + * search_params for search https://milvus.io/api-reference/pymilvus/v2.6.x/MilvusClient/Vector/search.md#Request-syntax + * param via AnnSearchRequest: https://milvus.io/api-reference/pymilvus/v2.6.x/MilvusClient/Vector/hybrid_search.md#Request-Syntax + + +----- + +* Finally i manage to make it work following the migration steps + +* Logical operators + * Logical operators are used to combine multiple conditions into a more complex filter expression. These include AND, OR, and NOT. + +* Range operators + * https://milvus.io/docs/basic-operators.md#Range-operators + * Supported Range Operators: + * IN: Used to match values within a specific set or range. + * LIKE: Used to match a pattern (mostly for text fields). Milvus allows you to build an NGRAM index on VARCHAR or JSON fields to accelerate text queries. For details, refer to [NGRAM](https://milvus.io/docs/ngram.md). + + +## Time + +Time fields + +* datetime +* modified_at +* created_at +==> Added +* indexed_at + + +# Reorder diff --git a/extern/indexer-ui b/extern/indexer-ui index 3620b98b..92e8875e 160000 --- a/extern/indexer-ui +++ b/extern/indexer-ui @@ -1 +1 @@ -Subproject commit 3620b98b715d1b9bd4869fef251549e10f0edf1e +Subproject commit 92e8875ee1537f7156e46a8dcb2a6085d887ddc8 diff --git a/openrag/components/indexer/indexer.py b/openrag/components/indexer/indexer.py index c7cff312..a81f8ec8 100644 --- a/openrag/components/indexer/indexer.py +++ b/openrag/components/indexer/indexer.py @@ -224,12 +224,12 @@ async def asearch( self, query: str, top_k: int = 5, - similarity_threshold: float = 0.80, + similarity_threshold: float = 0.60, partition: str | list[str] | None = None, - filter: dict | None = None, + filter: str | None = None, + filter_params: dict | None = None, ) -> list[Document]: partition_list = self._check_partition_list(partition) - filter = filter or {} vectordb = ray.get_actor("Vectordb", namespace="openrag") return await vectordb.async_search.remote( query=query, @@ -237,6 +237,7 @@ async def asearch( top_k=top_k, similarity_threshold=similarity_threshold, filter=filter, + filter_params=filter_params, ) def _check_partition_str(self, partition: str | None) -> str: diff --git a/openrag/components/indexer/utils/files.py b/openrag/components/indexer/utils/files.py index f9194de4..25312a75 100644 --- a/openrag/components/indexer/utils/files.py +++ b/openrag/components/indexer/utils/files.py @@ -1,12 +1,13 @@ import re import secrets import time +from datetime import UTC, datetime from pathlib import Path import aiofiles import consts from components.utils import load_config -from fastapi import UploadFile +from fastapi import HTTPException, UploadFile, status config = load_config() SERIALIZE_TIMEOUT = config.ray.indexer.get("serialize_timeout", 3600) @@ -84,3 +85,27 @@ async def serialize_file(task_id: str, path: str, metadata: dict | None = None): timeout=SERIALIZE_TIMEOUT, task_description=f"Serialization task {task_id}", ) + + +def extract_temporal_fields(metadata: dict, temporal_fields: list) -> dict: + result = {} + + ## Use provided created_at if available, otherwise extract from file system + for field in temporal_fields: + if field not in metadata or metadata[field] is None: + continue + + datetime_str = metadata[field] + try: + # Try parsing the provided datetime to ensure it's valid + d = datetime.fromisoformat(datetime_str) + if d.tzinfo is None: + d = d.replace(tzinfo=UTC) + result[field] = d.isoformat() + except Exception: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid ISO 8601 datetime field ({datetime_str}) for field '{field}'.", + ) + + return result diff --git a/openrag/components/indexer/utils/test_files.py b/openrag/components/indexer/utils/test_files.py index 0c84336a..ff46da5f 100644 --- a/openrag/components/indexer/utils/test_files.py +++ b/openrag/components/indexer/utils/test_files.py @@ -2,9 +2,9 @@ from pathlib import Path import pytest -from fastapi import UploadFile +from fastapi import HTTPException, UploadFile -from .files import sanitize_filename, save_file_to_disk +from .files import extract_temporal_fields, sanitize_filename, save_file_to_disk @pytest.mark.asyncio @@ -83,3 +83,30 @@ def fake_make_unique_filename(filename: str) -> str: ) def test_sanitize_filename(input_name, expected): assert sanitize_filename(input_name) == expected + + +# --- extract_temporal_fields --- + + +def test_extract_temporal_fields_field_not_in_metadata(): + assert extract_temporal_fields({}, ["created_at"]) == {} + + +def test_extract_temporal_fields_naive_datetime_defaults_to_utc(): + metadata = {"created_at": "2024-06-15T12:30:00"} + result = extract_temporal_fields(metadata, ["created_at"]) + assert result == {"created_at": "2024-06-15T12:30:00+00:00"} + + +def test_extract_temporal_fields_with_timezone(): + metadata = {"created_at": "2024-06-15T12:30:00+02:00"} + result = extract_temporal_fields(metadata, ["created_at"]) + assert result == {"created_at": "2024-06-15T12:30:00+02:00"} + + +def test_extract_temporal_fields_invalid_datetime_raises_400(): + with pytest.raises(HTTPException) as exc_info: + extract_temporal_fields({"created_at": "not-a-date"}, ["created_at"]) + assert exc_info.value.status_code == 400 + assert "not-a-date" in exc_info.value.detail + assert "created_at" in exc_info.value.detail diff --git a/openrag/components/indexer/vectordb/vectordb.py b/openrag/components/indexer/vectordb/vectordb.py index 4dbd0866..f0401c96 100644 --- a/openrag/components/indexer/vectordb/vectordb.py +++ b/openrag/components/indexer/vectordb/vectordb.py @@ -1,6 +1,7 @@ import asyncio import time from abc import ABC, abstractmethod +from datetime import UTC, datetime import numpy as np import ray @@ -102,12 +103,8 @@ async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = async def get_chunk_by_id(self, chunk_id: str): pass - # @abstractmethod - # def sample_chunk_ids( - # self, partition: str, n_ids: int = 100, seed: int | None = None - # ): - # pass +SCHEMA_VERSION_PROPERTY_KEY = "openrag.schema_version" MAX_LENGTH = 65_535 @@ -140,6 +137,7 @@ def __init__(self): self.config = load_config() self.logger = get_logger() + self.time_fields = ["created_at"] # init milvus clients self.port = self.config.vectordb.get("port") @@ -189,6 +187,7 @@ def load_collection(self): try: if self._client.has_collection(self.collection_name): self.logger.warning(f"Collection `{self.collection_name}` already exists. Loading it.") + self._check_schema_version() else: self.logger.info("Creating empty collection") index_params = self._create_index() @@ -212,6 +211,7 @@ def load_collection(self): collection_name=self.collection_name, operation="create_collection", ) + self._store_schema_version() try: self._client.load_collection(self.collection_name) self.collection_loaded = True @@ -283,6 +283,9 @@ def _create_schema(self): dim=self.embedder.embedding_dimension, ) + for time_field in self.time_fields: + schema.add_field(field_name=time_field, datatype=DataType.TIMESTAMPTZ, nullable=True) + if self.hybrid_search: # Add sparse field for BM25 - this will be auto-generated schema.add_field( @@ -336,9 +339,54 @@ def _create_index(self): "bm25_b": 0.75, }, ) + # indexes for dates TIMESTAMPTZ field + for time_field in self.time_fields: + index_params.add_index( + field_name=time_field, + index_type="STL_SORT", # Index for TIMESTAMPTZ + index_name=f"{time_field}_idx", + ) return index_params + def _store_schema_version(self) -> None: + """Persist the configured schema_version as a collection property after collection creation.""" + schema_version = self.config.vectordb.get("schema_version") + self._client.alter_collection_properties( + collection_name=self.collection_name, + properties={SCHEMA_VERSION_PROPERTY_KEY: str(schema_version)}, + ) + self.logger.info(f"Schema version {schema_version} stored on collection `{self.collection_name}`.") + + def _check_schema_version(self) -> None: + """ + Read the stored schema version from collection properties and compare it + against the configured schema_version. Raises VDBSchemaMigrationRequiredError + if they diverge so the application fails fast instead of silently working on a + stale schema. + """ + expected_version = self.config.vectordb.get("schema_version") + desc = self._client.describe_collection(self.collection_name) + props = desc.get("properties", {}) + raw = props.get(SCHEMA_VERSION_PROPERTY_KEY) + + try: + stored_version = int(raw) if raw is not None else 0 + except (ValueError, TypeError): + stored_version = 0 + + if stored_version != expected_version: + raise VDBSchemaMigrationRequiredError( + f"Collection `{self.collection_name}` is at schema version {stored_version} " + f"but the application requires version {expected_version}. " + "Please perform the migration script.", + collection_name=self.collection_name, + stored_version=stored_version, + expected_version=expected_version, + ) + + self.logger.info(f"Collection `{self.collection_name}` schema version {stored_version} — OK.") + async def list_collections(self) -> list[str]: return self._client.list_collections() @@ -379,11 +427,14 @@ async def async_add_documents(self, chunks: list[Document], user: dict) -> None: entities = [] vectors = await self.embedder.aembed_documents(chunks) order_metadata_l: list[dict] = _gen_chunk_order_metadata(n=len(chunks)) + indexed_at = datetime.now(UTC).isoformat() + for chunk, vector, order_metadata in zip(chunks, vectors, order_metadata_l): entities.append( { "text": chunk.page_content, "vector": vector, + "indexed_at": indexed_at, **order_metadata, **chunk.metadata, } @@ -395,6 +446,7 @@ async def async_add_documents(self, chunks: list[Document], user: dict) -> None: ) # insert file_id and partition into partition_file_manager + file_metadata.update({"indexed_at": indexed_at}) self.partition_file_manager.add_file_to_partition( file_id=file_id, partition=partition, @@ -452,9 +504,10 @@ async def async_search( self, query: str, top_k: int = 5, - similarity_threshold: int = 0.80, + similarity_threshold: float = 0.80, partition: list[str] = None, - filter: dict | None = None, + filter: str | None = None, + filter_params: dict | None = None, with_surrounding_chunks: bool = False, ) -> list[Document]: expr_parts = [] @@ -462,11 +515,11 @@ async def async_search( expr_parts.append(f"partition in {partition}") if filter: - for key, value in filter.items(): - expr_parts.append(f"{key} == '{value}'") + expr_parts.append(filter) # Join all parts with " and " only if there are multiple conditions expr = " and ".join(expr_parts) if expr_parts else "" + filter_params = filter_params or {} try: query_vector = await self.embedder.aembed_query(query) @@ -483,6 +536,7 @@ async def async_search( }, "limit": top_k, "expr": expr, + "expr_params": filter_params, } if self.hybrid_search: sparse_param = { @@ -494,6 +548,7 @@ async def async_search( }, "limit": top_k, "expr": expr, + "expr_params": filter_params, } reqs = [ AnnSearchRequest(**vector_param), @@ -507,10 +562,24 @@ async def async_search( limit=top_k, ) else: + vector_param = { + "data": [query_vector], + "anns_field": "vector", + "search_params": { + "metric_type": "COSINE", + "params": { + "ef": 64, + "radius": similarity_threshold, + "range_filter": 1.0, + }, + }, + "limit": top_k, + } response = await self._async_client.search( collection_name=self.collection_name, output_fields=["*"], - limit=top_k, + filter=expr, + filter_params=filter_params, **vector_param, ) @@ -617,33 +686,27 @@ async def delete_file(self, file_id: str, partition: str): file_id=file_id, ) - async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = False, limit: int = 100): + async def get_file_chunks(self, file_id: str, partition: str, include_id: bool = False, limit: int = 2000): log = self.logger.bind(file_id=file_id, partition=partition) try: self._check_file_exists(file_id, partition) - # Adjust filter expression based on the type of value - filter_expression = "partition == {partition} and file_id == {file_id}" - filter_params = {"partition": partition, "file_id": file_id} - - # Pagination parameters - offset = 0 - results = [] + filter_expr = f'partition == "{partition}" and file_id == "{file_id}"' excluded_keys = ["text", "vector", "_id"] if not include_id else ["text", "vector"] + results = [] + iterator = self._client.query_iterator( + collection_name=self.collection_name, + filter=filter_expr, + limit=limit, + batch_size=1000, + output_fields=["*"], + ) while True: - response = await self._async_client.query( - collection_name=self.collection_name, - filter=filter_expression, - filter_params=filter_params, - limit=limit, - offset=offset, - ) - - if not response: - break # No more results - - results.extend(response) - offset += len(response) # Move offset forward + batch = iterator.next() + if not batch: + iterator.close() + break + results.extend(batch) docs = [ Document( diff --git a/openrag/components/pipeline.py b/openrag/components/pipeline.py index f349438f..d46a2384 100644 --- a/openrag/components/pipeline.py +++ b/openrag/components/pipeline.py @@ -37,8 +37,17 @@ def __init__(self) -> None: logger.debug("Reranker", enabled=self.reranker_enabled) self.reranker_top_k = config.reranker["top_k"] - async def retrieve_docs(self, partition: list[str], query: str, top_k: int | None = None) -> list[Document]: - docs = await self.retriever.retrieve(partition=partition, query=query) + async def retrieve_docs( + self, + partition: list[str], + query: str, + top_k: int | None = None, + filter: str | None = None, + filter_params: dict | None = None, + ) -> list[Document]: + docs = await self.retriever.retrieve( + partition=partition, query=query, filter=filter, filter_params=filter_params + ) logger.debug("Documents retreived", document_count=len(docs)) if docs: diff --git a/openrag/components/retriever.py b/openrag/components/retriever.py index 56bc3ddb..99223a71 100644 --- a/openrag/components/retriever.py +++ b/openrag/components/retriever.py @@ -75,12 +75,16 @@ async def retrieve( self, partition: list[str], query: str, + filter: str | None = None, + filter_params: dict | None = None, ) -> list[Document]: db = get_vectordb() chunks = await db.async_search.remote( query=query, partition=partition, top_k=self.top_k, + filter=filter, + filter_params=filter_params, similarity_threshold=self.similarity_threshold, with_surrounding_chunks=self.with_surrounding_chunks, ) @@ -137,7 +141,7 @@ def __init__( prompt: ChatPromptTemplate = ChatPromptTemplate.from_template(MULTI_QUERY_PROMPT) self.generate_queries = prompt | llm | StrOutputParser() | (lambda x: x.split("[SEP]")) - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve(self, partition, query, filter=None, filter_params=None): db = get_vectordb() logger.debug("Generating multiple queries", k_queries=self.k_queries) generated_queries = await self.generate_queries.ainvoke( @@ -150,6 +154,8 @@ async def retrieve(self, partition: list[str], query: str) -> list[Document]: queries=generated_queries, partition=partition, top_k_per_query=self.top_k, + filter=filter, + filter_params=filter_params, similarity_threshold=self.similarity_threshold, with_surrounding_chunks=self.with_surrounding_chunks, ) @@ -196,7 +202,7 @@ async def get_hyde(self, query: str): hyde_document = await self.hyde_generator.ainvoke({"query": query}) return hyde_document - async def retrieve(self, partition: list[str], query: str) -> list[Document]: + async def retrieve(self, partition: list[str], query: str, filter=None, filter_params=None) -> list[Document]: db = get_vectordb() hyde = await self.get_hyde(query) queries = [hyde] @@ -209,6 +215,8 @@ async def retrieve(self, partition: list[str], query: str) -> list[Document]: top_k_per_query=self.top_k, similarity_threshold=self.similarity_threshold, with_surrounding_chunks=self.with_surrounding_chunks, + filter=filter, + filter_params=filter_params, ) diff --git a/openrag/routers/extract.py b/openrag/routers/extract.py index 5179a146..c47d3912 100644 --- a/openrag/routers/extract.py +++ b/openrag/routers/extract.py @@ -31,9 +31,6 @@ - `filename`: Original filename - `partition`: Partition name - `page`: Page number in source document - - `datetime`: Document date (if set) - - `modified_at`: File modification timestamp - - `created_at`: File creation timestamp - `indexed_at`: Chunk indexing timestamp - Additional custom metadata diff --git a/openrag/routers/indexer.py b/openrag/routers/indexer.py index 6b093beb..4cc48a74 100644 --- a/openrag/routers/indexer.py +++ b/openrag/routers/indexer.py @@ -1,10 +1,9 @@ import json -from datetime import datetime from pathlib import Path from typing import Any import ray -from components.indexer.utils.files import sanitize_filename, save_file_to_disk +from components.indexer.utils.files import extract_temporal_fields, sanitize_filename, save_file_to_disk from config import load_config from fastapi import ( APIRouter, @@ -49,6 +48,9 @@ # URL scheme configuration PREFERRED_URL_SCHEME = config.server.preferred_url_scheme +# DATETIME FIELDS: Fields provided by the client +TEMPORAL_FIELDS = ["created_at"] + def build_url(request: Request, route_name: str, **path_params) -> str: """Build a URL using the preferred scheme if configured.""" @@ -100,9 +102,14 @@ async def get_supported_types(): "mimetype": "text/plain", "author": "John Doe", ... + "created_at": "2025-01-03T00:00:00+08:00" // Optional temporal field (ISO 8601) } ``` +**Temporal Fields:** +- You can provide a temporal fields such as `created_at` in the metadata for time-based queries and filtering. +- Datetime values must be in ISO 8601 format (e.g., `2025-01-03T00:00:00+08:00`). + **Common Mimetypes:** - `text/plain` - Plain text files - `text/markdown` - Markdown files @@ -147,9 +154,12 @@ async def add_file( # Append extra metadata metadata["file_size"] = human_readable_size(file_stat.st_size) - metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() metadata["file_id"] = file_id + ## Add temporal fields to metadata, using provided values if available, otherwise extracting from file system + temporal_fields = extract_temporal_fields(metadata, temporal_fields=TEMPORAL_FIELDS) + metadata.update(temporal_fields) + # Indexing the file task = indexer.add_file.remote(path=file_path, metadata=metadata, partition=partition, user=user) await task_state_manager.set_state.remote(task.task_id().hex(), "QUEUED") @@ -210,9 +220,14 @@ async def delete_file( "mimetype": "text/plain", "author": "John Doe", ... + "created_at": "2024-01-01T12:00:00+00:00" // Optional temporal field (ISO 8601) } ``` +**Temporal Fields:** +- You can provide the temporal fields `created_at` in the metadata for time-based queries and filtering. +- Datetime values must be in ISO 8601 format (e.g., `2024-01-01T12:00:00+00:00`). + **Response:** Returns 202 Accepted with a task status URL for tracking indexing progress. """, @@ -254,9 +269,12 @@ async def put_file( # Append extra metadata metadata["file_size"] = human_readable_size(file_stat.st_size) - metadata["created_at"] = datetime.fromtimestamp(file_stat.st_ctime).isoformat() metadata["file_id"] = file_id + ## Add temporal fields to metadata, using provided values if available, otherwise extracting from file system + temporal_fields = extract_temporal_fields(metadata, temporal_fields=TEMPORAL_FIELDS) + metadata.update(temporal_fields) + # Indexing the file task = indexer.add_file.remote(path=file_path, metadata=metadata, partition=partition, user=user) await task_state_manager.set_state.remote(task.task_id().hex(), "QUEUED") diff --git a/openrag/routers/partition.py b/openrag/routers/partition.py index 25198385..cb4fd3f9 100644 --- a/openrag/routers/partition.py +++ b/openrag/routers/partition.py @@ -129,6 +129,7 @@ def process_file(file_dict): **Parameters:** - `partition`: The partition name - `file_id`: The unique file identifier +- `limit`: Maximum number of chunks to return (default: 2000) **Response:** Returns file information including: @@ -143,6 +144,7 @@ async def get_file( request: Request, partition: str, file_id: str, + limit: int = 2000, vectordb=Depends(get_vectordb), partition_viewer=Depends(require_partition_viewer), ): @@ -151,7 +153,7 @@ async def get_file( status_code=status.HTTP_404_NOT_FOUND, detail=f"'{file_id}' not found in partition '{partition}'", ) - results = await vectordb.get_file_chunks.remote(partition=partition, file_id=file_id, include_id=True) + results = await vectordb.get_file_chunks.remote(partition=partition, file_id=file_id, include_id=True, limit=limit) documents = [{"link": str(request.url_for("get_extract", extract_id=doc.metadata["_id"]))} for doc in results] diff --git a/openrag/routers/search.py b/openrag/routers/search.py index 16bea658..28012199 100644 --- a/openrag/routers/search.py +++ b/openrag/routers/search.py @@ -1,5 +1,8 @@ +import json +from typing import Annotated + from components.retriever import _expand_with_related_chunks -from fastapi import APIRouter, Depends, Query, Request, status +from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from fastapi.responses import JSONResponse from utils.dependencies import get_indexer, get_vectordb from utils.logger import get_logger @@ -15,6 +18,72 @@ router = APIRouter() +class RelatedDocSearchParams: + def __init__( + self, + include_related: bool = Query(False, description="Include chunks from files with same relationship_id"), + include_ancestors: bool = Query(False, description="Include chunks from ancestor files in hierarchy"), + related_limit: int = Query( + 20, ge=0, description="Maximum number of related/ancestor chunks to fetch per result" + ), + max_ancestor_depth: int | None = Query( + None, ge=0, description="Maximum depth of ancestor files to include. None means unlimited." + ), + ): + self.include_related = include_related + self.include_ancestors = include_ancestors + self.related_limit = related_limit + self.max_ancestor_depth = max_ancestor_depth + + +class CommonSearchParams: + def __init__( + self, + text: str = Query(..., description="Text to search semantically"), + top_k: int = Query(5, ge=1, description="Number of top results to return"), + similarity_threshold: float = Query( + 0.75, ge=0, le=1, description="Minimum similarity score for results (0 to 1)" + ), + filter: str | None = Query( + default=None, + description="""Milvus filter expression string.""", + ), + filter_params: str | None = Query( + default=None, + description="""Dictionary of parameter values for templated filters. Use with placeholders in filter expression for better performance.""", + ), + ): + self.text = text + self.top_k = top_k + self.similarity_threshold = similarity_threshold + self.filter = filter + self._filter_params = self._parse_filter_params(filter_params) + + @staticmethod + def _parse_filter_params(filter_params: str | None) -> dict | None: + if not filter_params: + return None + try: + parsed = json.loads(filter_params) + if not isinstance(parsed, dict): + raise HTTPException( + status_code=400, + detail="Invalid 'filter_params' field: must be a JSON object (dict), not a string or array. " + 'Example: {"page": 20} (use double quotes, no outer string quotes).', + ) + return parsed + except json.JSONDecodeError: + raise HTTPException( + status_code=400, + detail="Invalid 'filter_params' field: must be valid JSON. " + 'Use double quotes for keys and string values. Example: {"page": 20}', + ) + + @property + def filter_params(self) -> dict | None: + return self._filter_params + + @router.get( "", description="""Perform semantic search across multiple partitions. @@ -23,10 +92,27 @@ - `partitions`: List of partition names (default: ["all"]) - `text`: Search query text (required) - `top_k`: Number of results to return (default: 5) +- similarity_threshold: Minimum similarity score for results (0 to 1, default: 0.75) - `include_related`: Include chunks from files with same relationship_id (default: false) - `include_ancestors`: Include chunks from ancestor files in hierarchy (default: false) - `related_limit`: Maximum number of related/ancestor chunks to fetch per result (default: 20). This is used when `include_related` or `include_ancestors` is true. - `max_ancestor_depth`: Maximum depth of ancestor files to include. None means unlimited. (default: None) +- `filter`: Milvus filter expression string for additional filtering (optional) + Milvus supports the following operators: + - Comparison: ==, !=, >, <, >=, <= + - Range: IN, LIKE + - Logical: AND, OR, NOT (see https://milvus.io/docs/boolean.md) + Examples: + - `file_id == "abc123"` + - `created_at > ISO "{start_date}"` + - `page >= 5 AND page <= 10` + - `file_id in ["id1", "id2", "id3"]` + +- `filter_params`: Dictionary of parameter values for templated filters (optional) + Use with placeholders in filter expression for better performance. + Example: + - filter: `created_at > ISO "{start_date}" AND created_at < ISO "{end_date}"` + - filter_params: {"start_date": "2024-01-01T00:00:00+00:00", "end_date": "2024-12-31T23:59:59+00:00"} **Behavior:** - `partitions=["all"]`: Search all accessible partitions @@ -55,15 +141,9 @@ ) async def search_multiple_partitions( request: Request, - partitions: list[str] = Query(default=["all"], description="List of partitions to search"), - text: str = Query(..., description="Text to search semantically"), - top_k: int = Query(5, description="Number of top results to return"), - include_related: bool = Query(False, description="Include chunks from files with same relationship_id"), - include_ancestors: bool = Query(False, description="Include chunks from ancestor files in hierarchy"), - related_limit: int = Query(20, description="Maximum number of related/ancestor chunks to fetch per result"), - max_ancestor_depth: int | None = Query( - None, description="Maximum depth of ancestor files to include. None means unlimited." - ), + search_params: Annotated[CommonSearchParams, Depends()], + related_params: Annotated[RelatedDocSearchParams, Depends()], + partitions: list[str] | None = Query(default=["all"], description="List of partitions to search"), indexer=Depends(get_indexer), vectordb=Depends(get_vectordb), partition_viewer=Depends(require_partitions_viewer), @@ -75,27 +155,34 @@ async def search_multiple_partitions( log = logger.bind( partitions=partitions, - query=text, - top_k=top_k, - include_related=include_related, - include_ancestors=include_ancestors, + query=search_params.text, + top_k=search_params.top_k, + include_related=related_params.include_related, + include_ancestors=related_params.include_ancestors, ) - results = await indexer.asearch.remote(query=text, top_k=top_k, partition=partitions) + results = await indexer.asearch.remote( + query=search_params.text, + top_k=search_params.top_k, + similarity_threshold=search_params.similarity_threshold, + partition=partitions, + filter=search_params.filter, + filter_params=search_params.filter_params, + ) log.info( "Semantic search on multiple partitions completed.", result_count=len(results), ) # Expand with related/ancestor chunks if requested - if include_related or include_ancestors: + if related_params.include_related or related_params.include_ancestors: results = await _expand_with_related_chunks( results=results, db=vectordb, - include_related=include_related, - include_ancestors=include_ancestors, - related_limit=related_limit, - max_ancestor_depth=max_ancestor_depth, + include_related=related_params.include_related, + include_ancestors=related_params.include_ancestors, + related_limit=related_params.related_limit, + max_ancestor_depth=related_params.max_ancestor_depth, ) log.info( "Expanded results with related/ancestor chunks.", @@ -110,7 +197,6 @@ async def search_multiple_partitions( } for doc in results ] - return JSONResponse(status_code=status.HTTP_200_OK, content={"documents": documents}) @@ -124,10 +210,27 @@ async def search_multiple_partitions( **Query Parameters:** - `text`: Search query text (required) - `top_k`: Number of results to return (default: 5) +- similarity_threshold: Minimum similarity score for results (0 to 1, default: 0.75) - `include_related`: Include chunks from files with same relationship_id (default: false) - `include_ancestors`: Include chunks from ancestor files in hierarchy (default: false) - `related_limit`: Maximum number of related/ancestor chunks to fetch per result (default: 20). This is used when `include_related` or `include_ancestors` is true. - `max_ancestor_depth`: Maximum depth of ancestor files to include. None means unlimited. (default: None) +- `filter`: Milvus filter expression string for additional filtering (optional) + Milvus supports the following operators: + - Comparison: ==, !=, >, <, >=, <= + - Range: IN, LIKE + - Logical: AND, OR, NOT (see https://milvus.io/docs/boolean.md) + Examples: + - `file_id == "abc123"` + - `created_at > ISO "{start_date}"` + - `page >= 5 AND page <= 10` + - `file_id in ["id1", "id2", "id3"]` + +- `filter_params`: Dictionary of parameter values for templated filters (optional) + Use with placeholders in filter expression for better performance. + Example: + - filter: `created_at > ISO "{start_date}" AND created_at < ISO "{end_date}"` + - filter_params: {"start_date": "2024-01-01T00:00:00+00:00", "end_date": "2024-12-31T23:59:59+00:00"} **Permissions:** - Requires viewer role on the partition @@ -146,37 +249,40 @@ async def search_multiple_partitions( async def search_one_partition( request: Request, partition: str, - text: str = Query(..., description="Text to search semantically"), - top_k: int = Query(5, description="Number of top results to return"), - include_related: bool = Query(False, description="Include chunks from files with same relationship_id"), - include_ancestors: bool = Query(False, description="Include chunks from ancestor files in hierarchy"), - related_limit: int = Query(20, description="Maximum number of related/ancestor chunks to fetch per result"), - max_ancestor_depth: int | None = Query( - None, description="Maximum depth of ancestor files to include. None means unlimited." - ), + search_params: Annotated[CommonSearchParams, Depends()], + related_params: Annotated[RelatedDocSearchParams, Depends()], indexer=Depends(get_indexer), vectordb=Depends(get_vectordb), partition_viewer=Depends(require_partition_viewer), ): log = logger.bind( partition=partition, - query=text, - top_k=top_k, - include_related=include_related, - include_ancestors=include_ancestors, + query=search_params.text, + top_k=search_params.top_k, + include_related=related_params.include_related, + include_ancestors=related_params.include_ancestors, + ) + + results = await indexer.asearch.remote( + query=search_params.text, + top_k=search_params.top_k, + similarity_threshold=search_params.similarity_threshold, + partition=partition, + filter=search_params.filter, + filter_params=search_params.filter_params, ) - results = await indexer.asearch.remote(query=text, top_k=top_k, partition=partition) + log.info("Semantic search on single partition completed.", result_count=len(results)) # Expand with related/ancestor chunks if requested - if include_related or include_ancestors: + if related_params.include_related or related_params.include_ancestors: results = await _expand_with_related_chunks( results=results, db=vectordb, - include_related=include_related, - include_ancestors=include_ancestors, - related_limit=related_limit, - max_ancestor_depth=max_ancestor_depth, + include_related=related_params.include_related, + include_ancestors=related_params.include_ancestors, + related_limit=related_params.related_limit, + max_ancestor_depth=related_params.max_ancestor_depth, ) log.info( "Expanded results with related/ancestor chunks.", @@ -191,7 +297,6 @@ async def search_one_partition( } for doc in results ] - return JSONResponse(status_code=status.HTTP_200_OK, content={"documents": documents}) @@ -206,6 +311,23 @@ async def search_one_partition( **Query Parameters:** - `text`: Search query text (required) - `top_k`: Number of results to return (default: 5) +- similarity_threshold: Minimum similarity score for results (0 to 1, default: 0.75) +- `filter`: Milvus filter expression string for additional filtering (optional) + Milvus supports the following operators: + - Comparison: ==, !=, >, <, >=, <= + - Range: IN, LIKE + - Logical: AND, OR, NOT (see https://milvus.io/docs/boolean.md) + Examples: + - `file_id == "abc123"` + - `created_at > ISO "{start_date}"` + - `page >= 5 AND page <= 10` + - `file_id in ["id1", "id2", "id3"]` + +- `filter_params`: Dictionary of parameter values for templated filters (optional) + Use with placeholders in filter expression for better performance. + Example: + - filter: `created_at > ISO "{start_date}" AND created_at < ISO "{end_date}"` + - filter_params: {"start_date": "2024-01-01T00:00:00+00:00", "end_date": "2024-12-31T23:59:59+00:00"} **Permissions:** - Requires viewer role on the partition @@ -224,21 +346,26 @@ async def search_file( request: Request, partition: str, file_id: str, - text: str = Query(..., description="Text to search semantically"), - top_k: int = Query(5, description="Number of top results to return"), + search_params: Annotated[CommonSearchParams, Depends()], indexer=Depends(get_indexer), vectordb=Depends(get_vectordb), partition_viewer=Depends(require_partition_viewer), ): - log = logger.bind( + log = logger.bind(partition=partition, file_id=file_id, query=search_params.text, top_k=search_params.top_k) + + filter = "file_id == {_file_id}" + (f" AND ({search_params.filter})" if search_params.filter else "") + params = dict(search_params.filter_params or {}) + params.pop("_file_id", None) # disallow user override of route-scoped file_id + params["_file_id"] = file_id + + results = await indexer.asearch.remote( + query=search_params.text, + top_k=search_params.top_k, + similarity_threshold=search_params.similarity_threshold, partition=partition, - file_id=file_id, - query=text, - top_k=top_k, - include_related=False, - include_ancestors=False, + filter=filter, + filter_params=params, ) - results = await indexer.asearch.remote(query=text, top_k=top_k, partition=partition, filter={"file_id": file_id}) log.info("Semantic search on specific file completed.", result_count=len(results)) documents = [ @@ -249,5 +376,4 @@ async def search_file( } for doc in results ] - return JSONResponse(status_code=status.HTTP_200_OK, content={"documents": documents}) diff --git a/openrag/scripts/migrations/milvus/1.add_temporal_fields.py b/openrag/scripts/migrations/milvus/1.add_temporal_fields.py new file mode 100644 index 00000000..86052d8f --- /dev/null +++ b/openrag/scripts/migrations/milvus/1.add_temporal_fields.py @@ -0,0 +1,227 @@ +""" +Milvus migration: add temporal fields (schema version 0 → 1) +============================================================== +Adds the following TIMESTAMPTZ field (nullable) to the existing collection: + - created_at + +The field also gets an STL_SORT index. After a successful upgrade the +collection's ``openrag.schema_version`` property is set to 1 so the application +no longer raises VDBSchemaMigrationRequiredError on startup. + +Existing documents will retain null for these fields; new documents will have +them populated at index time by the application code. + +Usage (from repo root, inside the container): + # Dry-run first (inspect only, no changes): + docker compose run --no-deps --rm --build --entrypoint "" openrag \\ + uv run python scripts/migrations/milvus/1.add_temporal_fields.py --dry-run + + # Apply: + docker compose run --no-deps --rm --build --entrypoint "" openrag \\ + uv run python scripts/migrations/milvus/1.add_temporal_fields.py + + # Roll back indexes and reset version (fields cannot be dropped in Milvus): + docker compose run --no-deps --rm --entrypoint "" openrag \\ + uv run python scripts/migrations/milvus/1.add_temporal_fields.py --downgrade +""" + +import argparse +import sys + +from components.indexer.vectordb.vectordb import SCHEMA_VERSION_PROPERTY_KEY +from config import load_config +from pymilvus import DataType, MilvusClient +from utils.logger import get_logger + +TARGET_VERSION = 1 # The schema version this migration brings the collection to. + +# --------------------------------------------------------------------------- +# Declarative migration spec — edit here to change what gets added/removed. +# --------------------------------------------------------------------------- + +FIELDS_2_ADD = [ + {"field_name": "created_at", "data_type": DataType.TIMESTAMPTZ, "nullable": True}, +] + +INDEXES_2_ADD = [ + {"field_name": "created_at", "index_type": "STL_SORT", "index_name": "created_at_idx"}, +] + +# --------------------------------------------------------------------------- + +logger = get_logger() + + +def _get_existing_field_names(client: MilvusClient, collection_name: str) -> set[str]: + desc = client.describe_collection(collection_name) + return {f["name"] for f in desc["fields"]} + + +def _get_existing_index_names(client: MilvusClient, collection_name: str) -> set[str]: + return set(client.list_indexes(collection_name)) + + +def _get_stored_version(client: MilvusClient, collection_name: str) -> int: + desc = client.describe_collection(collection_name) + raw = desc.get("properties", {}).get(SCHEMA_VERSION_PROPERTY_KEY) + if raw is None: + return 0 + try: + return int(raw) + except ValueError: + return 0 + + +def _print_state(client: MilvusClient, collection_name: str, required_version: int) -> None: + """Query and display the current state of fields, indexes and schema version.""" + desc = client.describe_collection(collection_name) + field_map = {f["name"]: f for f in desc["fields"]} + existing_indexes = _get_existing_index_names(client, collection_name) + stored_version = _get_stored_version(client, collection_name) + + field_to_index = {idx["field_name"]: idx["index_name"] for idx in INDEXES_2_ADD} + + logger.info(f"--- Collection '{collection_name}' state ---") + logger.info(f" Schema version : stored={stored_version} required={required_version}") + logger.info(" Fields:") + for field_spec in FIELDS_2_ADD: + field_name = field_spec["field_name"] + index_name = field_to_index.get(field_name) + field_present = field_name in field_map + index_present = index_name is not None and index_name in existing_indexes + + index_detail = "" + if index_present: + info = client.describe_index(collection_name=collection_name, index_name=index_name) + index_detail = f" (index_type={info.get('index_type')})" + + index_status = f"OK{index_detail}" if index_present else ("N/A" if not index_name else "MISSING") + logger.info(f" {field_name}: field={'OK' if field_present else 'MISSING'} | index={index_status}") + + +def upgrade(client: MilvusClient, collection_name: str, dry_run: bool = False) -> None: + stored_version = _get_stored_version(client, collection_name) + if stored_version >= TARGET_VERSION: + logger.info(f"Collection is already at version {stored_version} — nothing to do.") + _print_state(client, collection_name, TARGET_VERSION) + return + + existing_fields = _get_existing_field_names(client, collection_name) + existing_indexes = _get_existing_index_names(client, collection_name) + fields_added: list[str] = [] + + for field_spec in FIELDS_2_ADD: + field_name = field_spec["field_name"] + if field_name in existing_fields: + logger.info(f"Field '{field_name}' already exists — skipping.") + continue + + logger.info(f"{'[DRY-RUN] ' if dry_run else ''}Adding field '{field_name}'.") + if not dry_run: + client.add_collection_field(collection_name=collection_name, **field_spec) + fields_added.append(field_name) + + index_params = client.prepare_index_params() + needs_index = False + + for idx_spec in INDEXES_2_ADD: + field_name = idx_spec["field_name"] + index_name = idx_spec["index_name"] + + if index_name in existing_indexes: + logger.info(f"Index '{index_name}' already exists — skipping.") + continue + + if field_name not in existing_fields and field_name not in fields_added: + logger.warning(f"Field '{field_name}' could not be added — skipping index.") + continue + + logger.info(f"{'[DRY-RUN] ' if dry_run else ''}Scheduling index '{index_name}' for '{field_name}'.") + index_params.add_index(**idx_spec) + needs_index = True + + if not dry_run: + if needs_index: + logger.info("Creating indexes...") + client.create_index(collection_name=collection_name, index_params=index_params) + logger.info("Indexes created.") + + # Bump stored version so the application stops raising VDBSchemaMigrationRequiredError. + logger.info(f"Storing schema version {TARGET_VERSION} on collection '{collection_name}'.") + client.alter_collection_properties( + collection_name=collection_name, + properties={SCHEMA_VERSION_PROPERTY_KEY: str(TARGET_VERSION)}, + ) + logger.info("Migration complete.") + else: + logger.info("Dry-run complete. No changes were made.") + + _print_state(client, collection_name, TARGET_VERSION) + + +def downgrade(client: MilvusClient, collection_name: str, dry_run: bool = False) -> None: + """ + Milvus does NOT support dropping fields from an existing collection. + The only way to fully roll back is to drop and recreate the collection, + which would lose all data. + + This function only removes the indexes and resets the version property. + """ + existing_indexes = _get_existing_index_names(client, collection_name) + + for idx_spec in INDEXES_2_ADD: + index_name = idx_spec["index_name"] + if index_name not in existing_indexes: + logger.info(f"Index '{index_name}' does not exist — skipping.") + continue + + logger.info(f"{'[DRY-RUN] ' if dry_run else ''}Dropping index '{index_name}'.") + if not dry_run: + client.drop_index(collection_name=collection_name, index_name=index_name) + + if not dry_run: + client.alter_collection_properties( + collection_name=collection_name, + properties={SCHEMA_VERSION_PROPERTY_KEY: "0"}, + ) + logger.info("Schema version reset to 0.") + + logger.warning( + f"Fields {[f['field_name'] for f in FIELDS_2_ADD]} cannot be dropped from Milvus. " + "To fully remove them you would need to recreate the collection." + ) + + _print_state(client, collection_name, required_version=0) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Milvus migration: add temporal fields (v0 → v1)") + parser.add_argument("--dry-run", action="store_true", help="Inspect only, make no changes") + parser.add_argument( + "--downgrade", + action="store_true", + help="Drop indexes and reset version (fields cannot be dropped in Milvus)", + ) + args = parser.parse_args() + + cfg = load_config() + host = cfg.vectordb.get("host") + port = cfg.vectordb.get("port") + collection_name = cfg.vectordb.get("collection_name", "vdb_test") + uri = f"http://{host}:{port}" + + logger.info(f"Connecting to Milvus at {uri}, collection='{collection_name}'") + client = MilvusClient(uri=uri) + + if not client.has_collection(collection_name): + logger.error(f"Collection '{collection_name}' does not exist. Aborting.") + sys.exit(1) + + if args.downgrade: + downgrade(client, collection_name, dry_run=args.dry_run) + else: + upgrade(client, collection_name, dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/openrag/utils/exceptions/vectordb.py b/openrag/utils/exceptions/vectordb.py index 964f9ed6..e9bf7cbc 100644 --- a/openrag/utils/exceptions/vectordb.py +++ b/openrag/utils/exceptions/vectordb.py @@ -106,6 +106,18 @@ def __init__(self, message: str, **kwargs): ) +class VDBSchemaMigrationRequiredError(VDBError): + """Raised when the collection schema version does not match the expected version.""" + + def __init__(self, message: str, **kwargs): + super().__init__( + message=message, + code="VDB_SCHEMA_MIGRATION_REQUIRED", + status_code=503, + **kwargs, + ) + + class UnexpectedVDBError(VDBError): """Raised for unexpected errors in vector database operations.""" diff --git a/pyproject.toml b/pyproject.toml index 7990c987..401b237a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,8 @@ dependencies = [ "fast-langdetect>=1.0.0", "ruff>=0.14.1", "cairosvg>=2.7.0", - "pymilvus>=2.5.12", + "pymilvus>=2.6.9", + "protobuf>=5.27,<6.0", "faster-whisper>=1.1.0", ] diff --git a/tests/api_tests/api_run/docker-compose.yaml b/tests/api_tests/api_run/docker-compose.yaml index 9bf54913..f3085533 100644 --- a/tests/api_tests/api_run/docker-compose.yaml +++ b/tests/api_tests/api_run/docker-compose.yaml @@ -28,12 +28,13 @@ services: retries: 10 etcd: - image: quay.io/coreos/etcd:v3.5.16 + image: quay.io/coreos/etcd:v3.5.25 environment: - ETCD_AUTO_COMPACTION_MODE=revision - ETCD_AUTO_COMPACTION_RETENTION=1000 - ETCD_QUOTA_BACKEND_BYTES=4294967296 - command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + - ETCD_SNAPSHOT_COUNT=50000 + command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd healthcheck: test: ["CMD", "etcdctl", "endpoint", "health"] interval: 5s @@ -41,7 +42,7 @@ services: retries: 5 minio: - image: minio/minio:RELEASE.2023-03-20T20-16-18Z + image: minio/minio:RELEASE.2024-12-18T13-15-44Z environment: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin @@ -53,10 +54,10 @@ services: retries: 5 milvus: - image: milvusdb/milvus:v2.5.4 + image: milvusdb/milvus:v2.6.11 command: ["milvus", "run", "standalone"] security_opt: - - seccomp:unconfined + - seccomp:unconfined environment: ETCD_ENDPOINTS: etcd:2379 MINIO_ADDRESS: minio:9000 diff --git a/tests/api_tests/test_search.py b/tests/api_tests/test_search.py index 54f56671..3ddfd4f1 100644 --- a/tests/api_tests/test_search.py +++ b/tests/api_tests/test_search.py @@ -3,6 +3,7 @@ import json import time import uuid +from datetime import datetime import pytest @@ -480,3 +481,394 @@ def test_max_ancestor_depth_limits_results(self, api_client, indexed_email_threa assert actual_order_limited == expected_order_limited, ( f"Expected {expected_order_limited}, got {actual_order_limited}" ) + + +class TestSearchFiltering: + """Test search filtering functionality on search_one_partition endpoint.""" + + COMMON_CONTENT = """This is a test document for filter testing. +It contains information about machine learning and artificial intelligence. +The document is used to verify that search filtering works correctly. +Key topics include: neural networks, deep learning, and natural language processing. +This content is intentionally repeated across multiple files to test filtering. +""" + + @pytest.fixture + def filter_test_files(self, tmp_path): + """Create 6 files with the same content but different metadata.""" + files_config = [ + { + "file_id": "filter-file-1", + "origin": "source_A", + "file_number": 1, + "created_at": "2020-06-15T00:00:00+00:00", + }, + { + "file_id": "filter-file-2", + "origin": "source_A", + "file_number": 2, + "created_at": "2021-06-15T00:00:00+00:00", + }, + { + "file_id": "filter-file-3", + "origin": "source_B", + "file_number": 3, + "created_at": "2022-06-15T00:00:00+00:00", + }, + { + "file_id": "filter-file-4", + "origin": "source_B", + "file_number": 4, + "created_at": "2023-06-15T00:00:00+00:00", + }, + { + "file_id": "filter-file-5", + "origin": "source_C", + "file_number": 5, + "created_at": "2024-06-15T00:00:00+00:00", + }, + { + "file_id": "filter-file-6", + "origin": "source_C", + "file_number": 6, + "created_at": "2024-07-15T00:00:00+00:00", + }, + ] + + file_paths = {} + for config in files_config: + file_id = config.pop("file_id") + file_path = tmp_path / f"{file_id}.txt" + file_path.write_text(self.COMMON_CONTENT) + config["path"] = file_path + file_paths[file_id] = config + + return file_paths + + @pytest.fixture + def indexed_filter_partition(self, api_client, created_partition, filter_test_files): + """Create partition and index all 6 files with metadata.""" + for file_id, file_info in filter_test_files.items(): + file_path = file_info.pop("path") + with open(file_path, "rb") as f: + response = api_client.post( + f"/indexer/partition/{created_partition}/file/{file_id}", + files={"file": (f"{file_id}.txt", f, "text/plain")}, + data={"metadata": json.dumps(file_info)}, + ) + + data = response.json() + + # Wait for indexing to complete + if "task_status_url" in data: + task_url = data["task_status_url"] + task_path = "/" + "/".join(task_url.split("/")[3:]) + elif "task_id" in data: + task_path = f"/indexer/task/{data['task_id']}" + else: + time.sleep(3) + continue + + for _ in range(30): + task_response = api_client.get(task_path) + task_data = task_response.json() + state = task_data.get("task_state", "") + if state in ["SUCCESS", "COMPLETED", "success", "completed"]: + break + elif state in ["FAILED", "failed", "FAILURE", "failure"]: + pytest.skip(f"Indexing failed for {file_id}: {task_data}") + time.sleep(2) + + return created_partition + + # ========================================================================= + # Comparison filtering tests + # ========================================================================= + + def test_comparison_filter_with_str(self, api_client, indexed_filter_partition): + """Test filtering with origin == 'source_A' returns only files with that origin.""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, # to ensure results as embeddings are random but deterministic based on content + "top_k": 10, + "filter": "origin == 'source_A'", + }, + ) + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find at least one document with origin='source_A'" + + # Verify all results have origin='source_A' + assert all(doc["metadata"].get("origin") == "source_A" for doc in documents), ( + "All documents should have origin='source_A'" + ) + + # Verify we got the expected file_ids + file_ids = {doc["metadata"].get("file_id") for doc in documents} + assert file_ids.issubset({"filter-file-1", "filter-file-2"}), f"Expected file_ids from source_A, got {file_ids}" + + def test_comparison_filter_with_int(self, api_client, indexed_filter_partition): + """Test filtering with file_number >= 3 returns files 3, 4, 5, 6.""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": "file_number >= 3", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find at least one document with file_number >= 3" + + assert all(doc["metadata"].get("file_number") >= 3 for doc in documents), ( + "All documents should have file_number >= 3" + ) + + # ========================================================================= + # Range filtering tests (IN and LIKE) + # ========================================================================= + + def test_filter_with_IN_operator(self, api_client, indexed_filter_partition): + """Test filtering with origin IN ['source_A', 'source_B'].""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'origin IN ["source_A", "source_B"]', + }, + ) + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find documents from source_A or source_B" + + assert all(doc["metadata"].get("origin") in ["source_A", "source_B"] for doc in documents), ( + "All documents should have origin in ['source_A', 'source_B']" + ) + + # Verify we got expected file_ids + file_ids = {doc["metadata"].get("file_id") for doc in documents} + expected_ids = {"filter-file-1", "filter-file-2", "filter-file-3", "filter-file-4"} + assert file_ids.issubset(expected_ids), f"Expected file_ids from source_A/B, got {file_ids}" + + def test_filter_with_LIKE_operator(self, api_client, indexed_filter_partition): + """Test filtering with origin LIKE 'source_%' (matches all).""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'origin LIKE "source_%"', + }, + ) + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find documents with origin matching 'source_%'" + + # Verify all results have origin starting with 'source_' + for doc in documents: + origin = doc["metadata"].get("origin", "") + assert origin.startswith("source_"), f"Expected origin starting with 'source_', got {origin}" + + # ========================================================================= + # Logical operator tests (AND, OR) + # ========================================================================= + + def test_logical_operator_AND(self, api_client, indexed_filter_partition): + """Test filtering with origin == 'source_B' AND file_number >= 4.""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'origin == "source_B" AND file_number >= 4', + }, + ) + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find documents matching both conditions" + + # Verify all results match both conditions + assert all( + doc["metadata"].get("origin") == "source_B" and doc["metadata"].get("file_number") >= 4 for doc in documents + ), "All documents should have origin='source_B' and file_number >= 4" + + # Only filter-file-4 should match (source_B and file_number=4) + file_ids = {doc["metadata"].get("file_id") for doc in documents} + assert file_ids == {"filter-file-4"}, f"Expected only filter-file-4, got {file_ids}" + + def test_logical_operator_OR(self, api_client, indexed_filter_partition): + """Test filtering with origin == 'source_A' OR file_number == 6.""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'origin == "source_A" OR file_number == 6', + }, + ) + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find documents matching either condition" + + assert all( + doc["metadata"].get("origin") == "source_A" or doc["metadata"].get("file_number") == 6 for doc in documents + ), "All documents should have origin='source_A' or file_number=6" + + # Should get filter-file-1, filter-file-2 (source_A) and filter-file-6 (file_number=6) + file_ids = {doc["metadata"].get("file_id") for doc in documents} + expected_ids = {"filter-file-1", "filter-file-2", "filter-file-6"} + assert file_ids == expected_ids, f"Expected {expected_ids}, got {file_ids}" + + # ========================================================================= + # Filter params tests (templated filters) + # ========================================================================= + + def test_filter_with_filter_params(self, api_client, indexed_filter_partition): + """Test that filtering with or without filter params yields the same results when values are the same.""" + # First, search with filter params + response_with_params = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": "origin == {origin_value} AND file_number >= {file_number_value}", + "filter_params": json.dumps({"origin_value": "source_B", "file_number_value": 3}), + }, + ) + assert response_with_params.status_code == 200 + data_with_params = response_with_params.json() + assert "documents" in data_with_params + + # Then, search with hardcoded values (no filter params) + response_hardcoded = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'origin == "source_B" AND file_number >= 3', + }, + ) + assert response_hardcoded.status_code == 200 + data_hardcoded = response_hardcoded.json() + assert "documents" in data_hardcoded + + # Results should be the same + docs_with_params = data_with_params["documents"] + docs_hardcoded = data_hardcoded["documents"] + + # Compare sets of file_ids to avoid ordering issues + file_ids_with_params = {doc["metadata"].get("file_id") for doc in docs_with_params} + file_ids_hardcoded = {doc["metadata"].get("file_id") for doc in docs_hardcoded} + assert file_ids_with_params == file_ids_hardcoded, ( + f"Expected same results with or without filter params, got {file_ids_with_params} vs {file_ids_hardcoded}" + ) + + # ========================================================================= + # Temporal filtering tests (datetime field, ISO 8601) + # ========================================================================= + + def test_temporal_fields_present_in_metadata(self, api_client, indexed_filter_partition): + """Test temporal fields are present in the metadata.""" + response = api_client.get( + f"/search/partition/{indexed_filter_partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + }, + ) + assert response.status_code == 200 + data = response.json() + assert "documents" in data + + documents = data["documents"] + assert len(documents) > 0, "Should find at least one document" + + # Verify that each document has temporal fields in metadata + for doc in documents: + metadata = doc.get("metadata", {}) + for temp_field in ["created_at"]: + k = metadata.get(temp_field) + assert k is not None, ( + f"Document {doc['metadata'].get('file_id')} is missing temporal field '{temp_field}'" + ) + # Verify it's a valid ISO 8601 datetime string + try: + datetime.fromisoformat(k) + except ValueError: + assert False, ( + f"Document {doc['metadata'].get('file_id')} has invalid datetime format in field '{temp_field}': {k}" + ) + + def test_temporal_filter_with_iso_format(self, api_client, indexed_filter_partition): + """Test that temporal filtering on the created_at field works with ISO 8601 strings.""" + partition = indexed_filter_partition + + # --- before 2022: should return files 1 and 2 --- + resp = api_client.get( + f"/search/partition/{partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'created_at < ISO "2022-01-01T00:00:00+00:00"', + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert "documents" in data + file_ids = {doc["metadata"].get("file_id") for doc in data["documents"]} + + assert file_ids == {"filter-file-1", "filter-file-2"}, f"Expected files before 2022, got {file_ids}" + + # --- after 2024-01-01: should return files 5 and 6 --- + resp = api_client.get( + f"/search/partition/{partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'created_at > ISO "2024-01-01T00:00:00+00:00"', + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert "documents" in data + file_ids = {doc["metadata"].get("file_id") for doc in data["documents"]} + assert file_ids == {"filter-file-5", "filter-file-6"}, f"Expected files after 2024, got {file_ids}" + + # --- range [2022, 2024]: should return files 3 and 4 --- + resp = api_client.get( + f"/search/partition/{partition}", + params={ + "text": self.COMMON_CONTENT, + "top_k": 10, + "filter": 'created_at >= ISO "2022-01-01T00:00:00+00:00" AND created_at <= ISO "2024-01-01T00:00:00+00:00"', + }, + ) + assert resp.status_code == 200 + data = resp.json() + assert "documents" in data + file_ids = {doc["metadata"].get("file_id") for doc in data["documents"]} + assert file_ids == {"filter-file-3", "filter-file-4"}, ( + f"Expected filter-file-3 and filter-file-4 in range [2022, 2024], got {file_ids}" + ) diff --git a/uv.lock b/uv.lock index 62e2d9eb..6f3a9729 100644 --- a/uv.lock +++ b/uv.lock @@ -2044,20 +2044,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, ] -[[package]] -name = "milvus-lite" -version = "2.5.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "tqdm" }, -] -wheels = [ - { url = "https://files.pythonhosted.org/packages/a9/b2/acc5024c8e8b6a0b034670b8e8af306ebd633ede777dcbf557eac4785937/milvus_lite-2.5.1-py3-none-macosx_10_9_x86_64.whl", hash = "sha256:6b014453200ba977be37ba660cb2d021030375fa6a35bc53c2e1d92980a0c512", size = 27934713, upload-time = "2025-06-30T04:23:37.028Z" }, - { url = "https://files.pythonhosted.org/packages/9b/2e/746f5bb1d6facd1e73eb4af6dd5efda11125b0f29d7908a097485ca6cad9/milvus_lite-2.5.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:a2e031088bf308afe5f8567850412d618cfb05a65238ed1a6117f60decccc95a", size = 24421451, upload-time = "2025-06-30T04:23:51.747Z" }, - { url = "https://files.pythonhosted.org/packages/2e/cf/3d1fee5c16c7661cf53977067a34820f7269ed8ba99fe9cf35efc1700866/milvus_lite-2.5.1-py3-none-manylinux2014_aarch64.whl", hash = "sha256:a13277e9bacc6933dea172e42231f7e6135bd3bdb073dd2688ee180418abd8d9", size = 45337093, upload-time = "2025-06-30T04:24:06.706Z" }, - { url = "https://files.pythonhosted.org/packages/d3/82/41d9b80f09b82e066894d9b508af07b7b0fa325ce0322980674de49106a0/milvus_lite-2.5.1-py3-none-manylinux2014_x86_64.whl", hash = "sha256:25ce13f4b8d46876dd2b7ac8563d7d8306da7ff3999bb0d14b116b30f71d706c", size = 55263911, upload-time = "2025-06-30T04:24:19.434Z" }, -] - [[package]] name = "monotonic" version = "1.6" @@ -2568,7 +2554,7 @@ wheels = [ [[package]] name = "openrag" -version = "1.1.6" +version = "1.1.7" source = { editable = "." } dependencies = [ { name = "aiopath" }, @@ -2600,6 +2586,7 @@ dependencies = [ { name = "numba" }, { name = "openai" }, { name = "pip" }, + { name = "protobuf" }, { name = "psutil" }, { name = "psycopg2" }, { name = "pydub" }, @@ -2657,10 +2644,11 @@ requires-dist = [ { name = "numba", specifier = ">=0.61.2" }, { name = "openai", specifier = ">=1.64.0" }, { name = "pip", specifier = ">=25.0.1" }, + { name = "protobuf", specifier = ">=5.27,<6.0" }, { name = "psutil", specifier = ">=7.0.0" }, { name = "psycopg2", specifier = ">=2.9.10" }, { name = "pydub", specifier = ">=0.25.1" }, - { name = "pymilvus", specifier = ">=2.5.12" }, + { name = "pymilvus", specifier = ">=2.6.9" }, { name = "pymupdf4llm", specifier = ">=0.0.17" }, { name = "pytest-env", specifier = ">=1.1.5" }, { name = "python-dotenv", specifier = ">=1.0.1" }, @@ -3613,16 +3601,16 @@ wheels = [ [[package]] name = "protobuf" -version = "6.31.1" +version = "5.29.6" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/52/f3/b9655a711b32c19720253f6f06326faf90580834e2e83f840472d752bc8b/protobuf-6.31.1.tar.gz", hash = "sha256:d8cac4c982f0b957a4dc73a80e2ea24fab08e679c0de9deb835f4a12d69aca9a", size = 441797, upload-time = "2025-05-28T19:25:54.947Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7e/57/394a763c103e0edf87f0938dafcd918d53b4c011dfc5c8ae80f3b0452dbb/protobuf-5.29.6.tar.gz", hash = "sha256:da9ee6a5424b6b30fd5e45c5ea663aef540ca95f9ad99d1e887e819cdf9b8723", size = 425623, upload-time = "2026-02-04T22:54:40.584Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f3/6f/6ab8e4bf962fd5570d3deaa2d5c38f0a363f57b4501047b5ebeb83ab1125/protobuf-6.31.1-cp310-abi3-win32.whl", hash = "sha256:7fa17d5a29c2e04b7d90e5e32388b8bfd0e7107cd8e616feef7ed3fa6bdab5c9", size = 423603, upload-time = "2025-05-28T19:25:41.198Z" }, - { url = "https://files.pythonhosted.org/packages/44/3a/b15c4347dd4bf3a1b0ee882f384623e2063bb5cf9fa9d57990a4f7df2fb6/protobuf-6.31.1-cp310-abi3-win_amd64.whl", hash = "sha256:426f59d2964864a1a366254fa703b8632dcec0790d8862d30034d8245e1cd447", size = 435283, upload-time = "2025-05-28T19:25:44.275Z" }, - { url = "https://files.pythonhosted.org/packages/6a/c9/b9689a2a250264a84e66c46d8862ba788ee7a641cdca39bccf64f59284b7/protobuf-6.31.1-cp39-abi3-macosx_10_9_universal2.whl", hash = "sha256:6f1227473dc43d44ed644425268eb7c2e488ae245d51c6866d19fe158e207402", size = 425604, upload-time = "2025-05-28T19:25:45.702Z" }, - { url = "https://files.pythonhosted.org/packages/76/a1/7a5a94032c83375e4fe7e7f56e3976ea6ac90c5e85fac8576409e25c39c3/protobuf-6.31.1-cp39-abi3-manylinux2014_aarch64.whl", hash = "sha256:a40fc12b84c154884d7d4c4ebd675d5b3b5283e155f324049ae396b95ddebc39", size = 322115, upload-time = "2025-05-28T19:25:47.128Z" }, - { url = "https://files.pythonhosted.org/packages/fa/b1/b59d405d64d31999244643d88c45c8241c58f17cc887e73bcb90602327f8/protobuf-6.31.1-cp39-abi3-manylinux2014_x86_64.whl", hash = "sha256:4ee898bf66f7a8b0bd21bce523814e6fbd8c6add948045ce958b73af7e8878c6", size = 321070, upload-time = "2025-05-28T19:25:50.036Z" }, - { url = "https://files.pythonhosted.org/packages/f7/af/ab3c51ab7507a7325e98ffe691d9495ee3d3aa5f589afad65ec920d39821/protobuf-6.31.1-py3-none-any.whl", hash = "sha256:720a6c7e6b77288b85063569baae8536671b39f15cc22037ec7045658d80489e", size = 168724, upload-time = "2025-05-28T19:25:53.926Z" }, + { url = "https://files.pythonhosted.org/packages/d4/88/9ee58ff7863c479d6f8346686d4636dd4c415b0cbeed7a6a7d0617639c2a/protobuf-5.29.6-cp310-abi3-win32.whl", hash = "sha256:62e8a3114992c7c647bce37dcc93647575fc52d50e48de30c6fcb28a6a291eb1", size = 423357, upload-time = "2026-02-04T22:54:25.805Z" }, + { url = "https://files.pythonhosted.org/packages/1c/66/2dc736a4d576847134fb6d80bd995c569b13cdc7b815d669050bf0ce2d2c/protobuf-5.29.6-cp310-abi3-win_amd64.whl", hash = "sha256:7e6ad413275be172f67fdee0f43484b6de5a904cc1c3ea9804cb6fe2ff366eda", size = 435175, upload-time = "2026-02-04T22:54:28.592Z" }, + { url = "https://files.pythonhosted.org/packages/06/db/49b05966fd208ae3f44dcd33837b6243b4915c57561d730a43f881f24dea/protobuf-5.29.6-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:b5a169e664b4057183a34bdc424540e86eea47560f3c123a0d64de4e137f9269", size = 418619, upload-time = "2026-02-04T22:54:30.266Z" }, + { url = "https://files.pythonhosted.org/packages/b7/d7/48cbf6b0c3c39761e47a99cb483405f0fde2be22cf00d71ef316ce52b458/protobuf-5.29.6-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:a8866b2cff111f0f863c1b3b9e7572dc7eaea23a7fae27f6fc613304046483e6", size = 320284, upload-time = "2026-02-04T22:54:31.782Z" }, + { url = "https://files.pythonhosted.org/packages/e3/dd/cadd6ec43069247d91f6345fa7a0d2858bef6af366dbd7ba8f05d2c77d3b/protobuf-5.29.6-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:e3387f44798ac1106af0233c04fb8abf543772ff241169946f698b3a9a3d3ab9", size = 320478, upload-time = "2026-02-04T22:54:32.909Z" }, + { url = "https://files.pythonhosted.org/packages/5a/cb/e3065b447186cb70aa65acc70c86baf482d82bf75625bf5a2c4f6919c6a3/protobuf-5.29.6-py3-none-any.whl", hash = "sha256:6b9edb641441b2da9fa8f428760fc136a49cf97a52076010cf22a2ff73438a86", size = 173126, upload-time = "2026-02-04T22:54:39.462Z" }, ] [[package]] @@ -3831,20 +3819,20 @@ sdist = { url = "https://files.pythonhosted.org/packages/5d/ab/34ec41718af73c001 [[package]] name = "pymilvus" -version = "2.5.12" +version = "2.6.9" source = { registry = "https://pypi.org/simple" } dependencies = [ + { name = "cachetools" }, { name = "grpcio" }, - { name = "milvus-lite", marker = "sys_platform != 'win32'" }, + { name = "orjson" }, { name = "pandas" }, { name = "protobuf" }, { name = "python-dotenv" }, { name = "setuptools" }, - { name = "ujson" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/fa/53/4af820a37163225a76656222ee43a0eb8f1bd2ceec063315680a585435da/pymilvus-2.5.12.tar.gz", hash = "sha256:79ec7dc0616c2484f77abe98bca8deafb613645b5703c492b51961afd4f985d8", size = 1265893, upload-time = "2025-07-02T15:34:00.385Z" } +sdist = { url = "https://files.pythonhosted.org/packages/94/0c/92adff800a04cd3e9b3f17c06fa972c8d590846b1e0bac0ccf39e054b596/pymilvus-2.6.9.tar.gz", hash = "sha256:c53a3d84ff15814e251be13edda70a98a1c8a6090d7597a908387cbb94a9504a", size = 1493560, upload-time = "2026-02-10T11:01:27.415Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/68/4f/80a4940f2772d10272c3292444af767a5aa1a5bbb631874568713ca01d54/pymilvus-2.5.12-py3-none-any.whl", hash = "sha256:ef77a4a0076469a30b05f0bb23b5a058acfbdca83d82af9574ca651764017f44", size = 231425, upload-time = "2025-07-02T15:33:58.938Z" }, + { url = "https://files.pythonhosted.org/packages/6a/56/ab7f0a5aba6fc06dc210a059d6f6d2ee1f3371d40e2b4366a409576554b8/pymilvus-2.6.9-py3-none-any.whl", hash = "sha256:3e14e8072f6429dcd79d52a24dc021c594cb80841ddd76cb974bc539d1f4cdda", size = 301225, upload-time = "2026-02-10T11:01:25.796Z" }, ] [[package]] @@ -5218,34 +5206,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5c/23/c7abc0ca0a1526a0774eca151daeb8de62ec457e77262b66b359c3c7679e/tzdata-2025.2-py2.py3-none-any.whl", hash = "sha256:1a403fada01ff9221ca8044d701868fa132215d84beb92242d9acd2147f667a8", size = 347839, upload-time = "2025-03-23T13:54:41.845Z" }, ] -[[package]] -name = "ujson" -version = "5.10.0" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/f0/00/3110fd566786bfa542adb7932d62035e0c0ef662a8ff6544b6643b3d6fd7/ujson-5.10.0.tar.gz", hash = "sha256:b3cd8f3c5d8c7738257f1018880444f7b7d9b66232c64649f562d7ba86ad4bc1", size = 7154885, upload-time = "2024-05-14T02:02:34.233Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/e8/a6/fd3f8bbd80842267e2d06c3583279555e8354c5986c952385199d57a5b6c/ujson-5.10.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:98ba15d8cbc481ce55695beee9f063189dce91a4b08bc1d03e7f0152cd4bbdd5", size = 55642, upload-time = "2024-05-14T02:01:04.055Z" }, - { url = "https://files.pythonhosted.org/packages/a8/47/dd03fd2b5ae727e16d5d18919b383959c6d269c7b948a380fdd879518640/ujson-5.10.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a9d2edbf1556e4f56e50fab7d8ff993dbad7f54bac68eacdd27a8f55f433578e", size = 51807, upload-time = "2024-05-14T02:01:05.25Z" }, - { url = "https://files.pythonhosted.org/packages/25/23/079a4cc6fd7e2655a473ed9e776ddbb7144e27f04e8fc484a0fb45fe6f71/ujson-5.10.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6627029ae4f52d0e1a2451768c2c37c0c814ffc04f796eb36244cf16b8e57043", size = 51972, upload-time = "2024-05-14T02:01:06.458Z" }, - { url = "https://files.pythonhosted.org/packages/04/81/668707e5f2177791869b624be4c06fb2473bf97ee33296b18d1cf3092af7/ujson-5.10.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f8ccb77b3e40b151e20519c6ae6d89bfe3f4c14e8e210d910287f778368bb3d1", size = 53686, upload-time = "2024-05-14T02:01:07.618Z" }, - { url = "https://files.pythonhosted.org/packages/bd/50/056d518a386d80aaf4505ccf3cee1c40d312a46901ed494d5711dd939bc3/ujson-5.10.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f3caf9cd64abfeb11a3b661329085c5e167abbe15256b3b68cb5d914ba7396f3", size = 58591, upload-time = "2024-05-14T02:01:08.901Z" }, - { url = "https://files.pythonhosted.org/packages/fc/d6/aeaf3e2d6fb1f4cfb6bf25f454d60490ed8146ddc0600fae44bfe7eb5a72/ujson-5.10.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:6e32abdce572e3a8c3d02c886c704a38a1b015a1fb858004e03d20ca7cecbb21", size = 997853, upload-time = "2024-05-14T02:01:10.772Z" }, - { url = "https://files.pythonhosted.org/packages/f8/d5/1f2a5d2699f447f7d990334ca96e90065ea7f99b142ce96e85f26d7e78e2/ujson-5.10.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:a65b6af4d903103ee7b6f4f5b85f1bfd0c90ba4eeac6421aae436c9988aa64a2", size = 1140689, upload-time = "2024-05-14T02:01:12.214Z" }, - { url = "https://files.pythonhosted.org/packages/f2/2c/6990f4ccb41ed93744aaaa3786394bca0875503f97690622f3cafc0adfde/ujson-5.10.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:604a046d966457b6cdcacc5aa2ec5314f0e8c42bae52842c1e6fa02ea4bda42e", size = 1043576, upload-time = "2024-05-14T02:01:14.39Z" }, - { url = "https://files.pythonhosted.org/packages/14/f5/a2368463dbb09fbdbf6a696062d0c0f62e4ae6fa65f38f829611da2e8fdd/ujson-5.10.0-cp312-cp312-win32.whl", hash = "sha256:6dea1c8b4fc921bf78a8ff00bbd2bfe166345f5536c510671bccececb187c80e", size = 38764, upload-time = "2024-05-14T02:01:15.83Z" }, - { url = "https://files.pythonhosted.org/packages/59/2d/691f741ffd72b6c84438a93749ac57bf1a3f217ac4b0ea4fd0e96119e118/ujson-5.10.0-cp312-cp312-win_amd64.whl", hash = "sha256:38665e7d8290188b1e0d57d584eb8110951a9591363316dd41cf8686ab1d0abc", size = 42211, upload-time = "2024-05-14T02:01:17.567Z" }, - { url = "https://files.pythonhosted.org/packages/0d/69/b3e3f924bb0e8820bb46671979770c5be6a7d51c77a66324cdb09f1acddb/ujson-5.10.0-cp313-cp313-macosx_10_9_x86_64.whl", hash = "sha256:618efd84dc1acbd6bff8eaa736bb6c074bfa8b8a98f55b61c38d4ca2c1f7f287", size = 55646, upload-time = "2024-05-14T02:01:19.26Z" }, - { url = "https://files.pythonhosted.org/packages/32/8a/9b748eb543c6cabc54ebeaa1f28035b1bd09c0800235b08e85990734c41e/ujson-5.10.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:38d5d36b4aedfe81dfe251f76c0467399d575d1395a1755de391e58985ab1c2e", size = 51806, upload-time = "2024-05-14T02:01:20.593Z" }, - { url = "https://files.pythonhosted.org/packages/39/50/4b53ea234413b710a18b305f465b328e306ba9592e13a791a6a6b378869b/ujson-5.10.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:67079b1f9fb29ed9a2914acf4ef6c02844b3153913eb735d4bf287ee1db6e557", size = 51975, upload-time = "2024-05-14T02:01:21.904Z" }, - { url = "https://files.pythonhosted.org/packages/b4/9d/8061934f960cdb6dd55f0b3ceeff207fcc48c64f58b43403777ad5623d9e/ujson-5.10.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d7d0e0ceeb8fe2468c70ec0c37b439dd554e2aa539a8a56365fd761edb418988", size = 53693, upload-time = "2024-05-14T02:01:23.742Z" }, - { url = "https://files.pythonhosted.org/packages/f5/be/7bfa84b28519ddbb67efc8410765ca7da55e6b93aba84d97764cd5794dbc/ujson-5.10.0-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:59e02cd37bc7c44d587a0ba45347cc815fb7a5fe48de16bf05caa5f7d0d2e816", size = 58594, upload-time = "2024-05-14T02:01:25.554Z" }, - { url = "https://files.pythonhosted.org/packages/48/eb/85d465abafb2c69d9699cfa5520e6e96561db787d36c677370e066c7e2e7/ujson-5.10.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:2a890b706b64e0065f02577bf6d8ca3b66c11a5e81fb75d757233a38c07a1f20", size = 997853, upload-time = "2024-05-14T02:01:27.151Z" }, - { url = "https://files.pythonhosted.org/packages/9f/76/2a63409fc05d34dd7d929357b7a45e3a2c96f22b4225cd74becd2ba6c4cb/ujson-5.10.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:621e34b4632c740ecb491efc7f1fcb4f74b48ddb55e65221995e74e2d00bbff0", size = 1140694, upload-time = "2024-05-14T02:01:29.113Z" }, - { url = "https://files.pythonhosted.org/packages/45/ed/582c4daba0f3e1688d923b5cb914ada1f9defa702df38a1916c899f7c4d1/ujson-5.10.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:b9500e61fce0cfc86168b248104e954fead61f9be213087153d272e817ec7b4f", size = 1043580, upload-time = "2024-05-14T02:01:31.447Z" }, - { url = "https://files.pythonhosted.org/packages/d7/0c/9837fece153051e19c7bade9f88f9b409e026b9525927824cdf16293b43b/ujson-5.10.0-cp313-cp313-win32.whl", hash = "sha256:4c4fc16f11ac1612f05b6f5781b384716719547e142cfd67b65d035bd85af165", size = 38766, upload-time = "2024-05-14T02:01:32.856Z" }, - { url = "https://files.pythonhosted.org/packages/d7/72/6cb6728e2738c05bbe9bd522d6fc79f86b9a28402f38663e85a28fddd4a0/ujson-5.10.0-cp313-cp313-win_amd64.whl", hash = "sha256:4573fd1695932d4f619928fd09d5d03d917274381649ade4328091ceca175539", size = 42212, upload-time = "2024-05-14T02:01:33.97Z" }, -] - [[package]] name = "umap-learn" version = "0.5.9.post2" diff --git a/vdb/milvus.yaml b/vdb/milvus.yaml index 6a9f37e0..8e34d7e9 100644 --- a/vdb/milvus.yaml +++ b/vdb/milvus.yaml @@ -1,6 +1,6 @@ services: etcd: - image: quay.io/coreos/etcd:v3.5.16 + image: quay.io/coreos/etcd:v3.5.25 environment: - ETCD_AUTO_COMPACTION_MODE=revision - ETCD_AUTO_COMPACTION_RETENTION=1000 @@ -8,7 +8,7 @@ services: - ETCD_SNAPSHOT_COUNT=50000 volumes: - ${MILVUS_VOLUME_DIRECTORY:-./volumes}/etcd:/etcd - command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + command: etcd -advertise-client-urls=http://etcd:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd healthcheck: test: ["CMD", "etcdctl", "endpoint", "health"] interval: 30s @@ -16,7 +16,7 @@ services: retries: 3 minio: - image: minio/minio:RELEASE.2023-03-20T20-16-18Z + image: minio/minio:RELEASE.2024-12-18T13-15-44Z environment: MINIO_ACCESS_KEY: minioadmin MINIO_SECRET_KEY: minioadmin @@ -30,7 +30,7 @@ services: retries: 3 milvus: - image: milvusdb/milvus:v2.5.4 + image: milvusdb/milvus:v2.6.11 command: ["milvus", "run", "standalone"] security_opt: - seccomp:unconfined