From 862a160411720233760cb246acb858a009fd19d9 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Thu, 26 Mar 2026 14:56:05 +0530 Subject: [PATCH 1/9] feat(weaviate): add tenant support in write_documents with tests (sync + async) --- .../weaviate/document_store.py | 41 +++++++++++++------ .../weaviate/tests/test_document_store.py | 9 ++++ .../tests/test_document_store_async.py | 12 ++++++ 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 45fa51fa84..a8328049dc 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -934,7 +934,7 @@ def _handle_failed_objects(failed_objects: list[ErrorObject]) -> NoReturn: ) raise DocumentStoreError(msg) - def _batch_write(self, documents: list[Document]) -> int: + def _batch_write(self, documents: list[Document], tenant: str | None = None) -> int: """ Writes document to Weaviate in batches. @@ -953,6 +953,7 @@ def _batch_write(self, documents: list[Document]) -> int: collection=self.collection.name, uuid=generate_uuid5(doc.id), vector=doc.embedding, + tenant=tenant ) if failed_objects := self.client.batch.failed_objects: self._handle_failed_objects(failed_objects) @@ -961,7 +962,7 @@ def _batch_write(self, documents: list[Document]) -> int: # So we assume that all Documents were written. return len(documents) - async def _batch_write_async(self, documents: list[Document]) -> int: + async def _batch_write_async(self, documents: list[Document], tenant: str | None = None) -> int: """ Asynchronously writes document to Weaviate in batches. @@ -981,6 +982,7 @@ async def _batch_write_async(self, documents: list[Document]) -> int: collection=(await self.async_collection).name, uuid=generate_uuid5(doc.id), vector=doc.embedding, + tenant=tenant ) if failed_objects := client.batch.failed_objects: @@ -990,7 +992,7 @@ async def _batch_write_async(self, documents: list[Document]) -> int: # So we assume that all Documents were written. return len(documents) - def _write(self, documents: list[Document], policy: DuplicatePolicy) -> int: + def _write(self, documents: list[Document], policy: DuplicatePolicy, tenant: str | None = None) -> int: """ Writes documents to Weaviate using the specified policy. @@ -998,6 +1000,9 @@ def _write(self, documents: list[Document], policy: DuplicatePolicy) -> int: If policy is set to SKIP it will skip any document that already exists. If policy is set to FAIL it will raise an exception if any of the documents already exists. """ + collection = self.collection + if tenant: + collection = collection.with_tenant(tenant) written = 0 duplicate_errors_ids = [] for doc in documents: @@ -1005,12 +1010,12 @@ def _write(self, documents: list[Document], policy: DuplicatePolicy) -> int: msg = f"Expected a Document, got '{type(doc)}' instead." raise ValueError(msg) - if policy == DuplicatePolicy.SKIP and self.collection.data.exists(uuid=generate_uuid5(doc.id)): + if policy == DuplicatePolicy.SKIP and collection.data.exists(uuid=generate_uuid5(doc.id)): # This Document already exists, we skip it continue try: - self.collection.data.insert( + collection.data.insert( uuid=generate_uuid5(doc.id), properties=WeaviateDocumentStore._to_data_object(doc), vector=doc.embedding, @@ -1025,7 +1030,12 @@ def _write(self, documents: list[Document], policy: DuplicatePolicy) -> int: raise DuplicateDocumentError(msg) return written - async def _write_async(self, documents: list[Document], policy: DuplicatePolicy) -> int: + async def _write_async( + self, + documents: list[Document], + policy: DuplicatePolicy, + tenant: str | None = None, + ) -> int: """ Asynchronously writes documents to Weaviate using the specified policy. @@ -1041,7 +1051,7 @@ async def _write_async(self, documents: list[Document], policy: DuplicatePolicy) msg = f"Expected a Document, got '{type(doc)}' instead." raise ValueError(msg) - if policy == DuplicatePolicy.SKIP and await (await self.async_collection).data.exists( + if policy == DuplicatePolicy.SKIP and await collection.data.exists( uuid=generate_uuid5(doc.id) ): # This Document already exists, continue @@ -1063,7 +1073,12 @@ async def _write_async(self, documents: list[Document], policy: DuplicatePolicy) raise DuplicateDocumentError(msg) return len(documents) - def write_documents(self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE) -> int: + def write_documents( + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.NONE, + tenant: str | None = None, + ) -> int: """ Writes documents to Weaviate using the specified policy. @@ -1089,12 +1104,12 @@ def write_documents(self, documents: list[Document], policy: DuplicatePolicy = D The number of documents written. """ if policy in [DuplicatePolicy.NONE, DuplicatePolicy.OVERWRITE]: - return self._batch_write(documents) + return self._batch_write(documents, tenant) - return self._write(documents, policy) + return self._write(documents, policy, tenant) async def write_documents_async( - self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE + self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE, tenant: str | None = None, ) -> int: """ Asynchronously writes documents to Weaviate using the specified policy. @@ -1121,9 +1136,9 @@ async def write_documents_async( The number of documents written. """ if policy in [DuplicatePolicy.NONE, DuplicatePolicy.OVERWRITE]: - return await self._batch_write_async(documents) + return await self._batch_write_async(documents, tenant) - return await self._write_async(documents, policy) + return await self._write_async(documents, policy, tenant) def delete_documents(self, document_ids: list[str]) -> None: """ diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 8d1be5f5bc..3cd46ebed1 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -376,6 +376,15 @@ def test_write_documents(self, document_store): doc.content = "test doc 2" assert document_store.write_documents([doc]) == 1 assert document_store.count_documents() == 1 + + def test_write_documents_with_tenant(self, document_store): + doc = Document(content="tenant test doc") + + # Write with tenant + written = document_store.write_documents([doc], tenant="tenant1") + + assert written == 1 + assert document_store.count_documents() == 1 def test_write_documents_with_blob_data(self, document_store, test_files_path): image = ByteStream.from_file_path(test_files_path / "robot1.jpg", mime_type="image/jpeg") diff --git a/integrations/weaviate/tests/test_document_store_async.py b/integrations/weaviate/tests/test_document_store_async.py index 62bf334076..d8c7720435 100644 --- a/integrations/weaviate/tests/test_document_store_async.py +++ b/integrations/weaviate/tests/test_document_store_async.py @@ -104,6 +104,18 @@ async def test_write_documents_async(self, document_store: WeaviateDocumentStore assert await document_store.write_documents_async([doc]) == 1 assert await document_store.count_documents_async() == 1 + @pytest.mark.asyncio + async def test_write_documents_with_tenant_async(self, document_store): + doc = Document(content="tenant test doc") + + written = await document_store.write_documents_async([doc], tenant="tenant1") + + assert written == 1 + + docs = await document_store.filter_documents_async() + assert len(docs) == 1 + assert docs[0].content == "tenant test doc" + @pytest.mark.asyncio async def test_write_documents_with_blob_data_async( self, document_store: WeaviateDocumentStore, test_files_path: Path From 73e97f20cd098a13c3799de9e07f10d16ed7eead Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Thu, 26 Mar 2026 23:14:49 +0530 Subject: [PATCH 2/9] feat(weaviate): add tenant support in write_documents and async batch write --- .../weaviate/document_store.py | 75 ++++++++----------- .../weaviate/tests/test_document_store.py | 4 +- 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index a8328049dc..744ac1a0f0 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -177,10 +177,11 @@ def _clean_connection_settings(self) -> None: _class_name = self._collection_settings.get("class", "Default") _class_name = _class_name[0].upper() + _class_name[1:] self._collection_settings["class"] = _class_name + # Set the properties if they're not set self._collection_settings["properties"] = self._collection_settings.get( "properties", DOCUMENT_COLLECTION_PROPERTIES - ) + ) @property def client(self) -> weaviate.WeaviateClient: @@ -937,11 +938,13 @@ def _handle_failed_objects(failed_objects: list[ErrorObject]) -> NoReturn: def _batch_write(self, documents: list[Document], tenant: str | None = None) -> int: """ Writes document to Weaviate in batches. - - Documents with the same id will be overwritten. - Raises in case of errors. """ + # Handle tenant at collection level (NOT via kwargs) + collection = self.collection + if tenant is not None: + collection = collection.with_tenant(tenant) + with self.client.batch.dynamic() as batch: for doc in documents: if not isinstance(doc, Document): @@ -950,27 +953,28 @@ def _batch_write(self, documents: list[Document], tenant: str | None = None) -> batch.add_object( properties=WeaviateDocumentStore._to_data_object(doc), - collection=self.collection.name, + collection=collection.name, uuid=generate_uuid5(doc.id), vector=doc.embedding, - tenant=tenant ) + if failed_objects := self.client.batch.failed_objects: self._handle_failed_objects(failed_objects) - # If the document already exists we get no status message back from Weaviate. - # So we assume that all Documents were written. return len(documents) async def _batch_write_async(self, documents: list[Document], tenant: str | None = None) -> int: """ Asynchronously writes document to Weaviate in batches. - - Documents with the same id will be overwritten. - Raises in case of errors. """ + client = await self.async_client + # Handle tenant properly + collection = await self.async_collection + if tenant is not None: + collection = collection.with_tenant(tenant) + async with client.batch.stream() as batch: for doc in documents: if not isinstance(doc, Document): @@ -979,17 +983,14 @@ async def _batch_write_async(self, documents: list[Document], tenant: str | None await batch.add_object( properties=WeaviateDocumentStore._to_data_object(doc), - collection=(await self.async_collection).name, + collection=collection.name, uuid=generate_uuid5(doc.id), vector=doc.embedding, - tenant=tenant ) if failed_objects := client.batch.failed_objects: self._handle_failed_objects(failed_objects) - # If the document already exists we get no status message back from Weaviate. - # So we assume that all Documents were written. return len(documents) def _write(self, documents: list[Document], policy: DuplicatePolicy, tenant: str | None = None) -> int: @@ -1260,25 +1261,23 @@ async def delete_all_documents_async(self, *, recreate_index: bool = False, batc ) def delete_by_filter(self, filters: dict[str, Any]) -> int: - """ - Deletes all documents that match the provided filters. - - :param filters: The filters to apply to select documents for deletion. - For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :returns: The number of documents deleted. - """ validate_filters(filters) try: + collection = self.collection # ✅ FIX + weaviate_filter = convert_filters(filters) - result = self.collection.data.delete_many(where=weaviate_filter) + result = collection.data.delete_many(where=weaviate_filter) deleted_count = result.successful + logger.info( "Deleted {n_docs} documents from collection '{collection}' using filters.", n_docs=deleted_count, - collection=self.collection.name, + collection=collection.name, ) + return deleted_count + except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}" raise DocumentStoreError(msg) from e @@ -1315,14 +1314,6 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: raise DocumentStoreError(msg) from e def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: - """ - Updates the metadata of all documents that match the provided filters. - - :param filters: The filters to apply to select documents for updating. - For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :param meta: The metadata fields to update. These will be merged with existing metadata. - :returns: The number of documents updated. - """ validate_filters(filters) if not isinstance(meta, dict): @@ -1330,39 +1321,35 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int raise ValueError(msg) try: + collection = self.collection # ✅ FIX + matching_objects = self._query_with_filters(filters) if not matching_objects: return 0 - # Update each object with the new metadata - # Since metadata is stored flattened in Weaviate properties, we update properties directly updated_count = 0 failed_updates = [] for obj in matching_objects: try: - # Get current properties current_properties = obj.properties.copy() if obj.properties else {} - # Update with new metadata values - # Note: metadata fields are stored directly in properties (flattened) for key, value in meta.items(): current_properties[key] = value - # Update the object, preserving the vector - # Get the vector from the object to preserve it during replace vector: VECTORS | None = None if isinstance(obj.vector, (list, dict)): vector = obj.vector - self.collection.data.replace( + collection.data.replace( # ✅ FIX uuid=obj.uuid, properties=current_properties, vector=vector, ) + updated_count += 1 + except Exception as e: - # Collect failed updates but continue with others obj_properties = obj.properties or {} id_ = obj_properties.get("_original_id", obj.uuid) failed_updates.append((id_, str(e))) @@ -1376,9 +1363,11 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int logger.info( "Updated {n_docs} documents in collection '{collection}' using filters.", n_docs=updated_count, - collection=self.collection.name, + collection=collection.name, ) + return updated_count + except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to update documents by filter in Weaviate. Error: {e.message}" raise DocumentStoreError(msg) from e @@ -1618,4 +1607,4 @@ async def _hybrid_retrieval_async( return_metadata=["score"], ) - return [WeaviateDocumentStore._to_document(doc) for doc in result.objects] + return [WeaviateDocumentStore._to_document(doc) for doc in result.objects] \ No newline at end of file diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 3cd46ebed1..3301089bd9 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -8,6 +8,7 @@ from collections.abc import Generator from unittest.mock import MagicMock, patch +import platform import pytest from dateutil import parser from haystack.dataclasses.byte_stream import ByteStream @@ -833,6 +834,7 @@ def test_connect_to_local(self): ) assert document_store.client + @pytest.mark.skipif(platform.system() == "Windows", reason="EmbeddedDB not supported on Windows") def test_connect_to_embedded(self): document_store = WeaviateDocumentStore(embedded_options=EmbeddedOptions()) assert document_store.client @@ -1142,4 +1144,4 @@ def test_count_unique_metadata_by_filter_all_documents(document_store): ) assert counts["category"] == 3 assert counts["status"] == 2 - assert counts["priority"] == 3 + assert counts["priority"] == 3 \ No newline at end of file From 2bc672203954c179166474316c762b39c962bf32 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Thu, 26 Mar 2026 23:59:54 +0530 Subject: [PATCH 3/9] feat(weaviate): add tenant support for write operations and fix async consistency --- .../weaviate/document_store.py | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 744ac1a0f0..394b87e732 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -181,7 +181,7 @@ def _clean_connection_settings(self) -> None: # Set the properties if they're not set self._collection_settings["properties"] = self._collection_settings.get( "properties", DOCUMENT_COLLECTION_PROPERTIES - ) + ) @property def client(self) -> weaviate.WeaviateClient: @@ -953,7 +953,7 @@ def _batch_write(self, documents: list[Document], tenant: str | None = None) -> batch.add_object( properties=WeaviateDocumentStore._to_data_object(doc), - collection=collection.name, + collection=collection.name, uuid=generate_uuid5(doc.id), vector=doc.embedding, ) @@ -983,7 +983,7 @@ async def _batch_write_async(self, documents: list[Document], tenant: str | None await batch.add_object( properties=WeaviateDocumentStore._to_data_object(doc), - collection=collection.name, + collection=collection.name, uuid=generate_uuid5(doc.id), vector=doc.embedding, ) @@ -1011,7 +1011,7 @@ def _write(self, documents: list[Document], policy: DuplicatePolicy, tenant: str msg = f"Expected a Document, got '{type(doc)}' instead." raise ValueError(msg) - if policy == DuplicatePolicy.SKIP and collection.data.exists(uuid=generate_uuid5(doc.id)): + if policy == DuplicatePolicy.SKIP and collection.data.exists(uuid=generate_uuid5(doc.id)): # This Document already exists, we skip it continue @@ -1045,7 +1045,8 @@ async def _write_async( If policy is set to FAIL it will raise an exception if any of the documents already exists. """ collection = await self.async_collection - + if tenant: + collection = collection.with_tenant(tenant) duplicate_errors_ids = [] for doc in documents: if not isinstance(doc, Document): @@ -1261,10 +1262,17 @@ async def delete_all_documents_async(self, *, recreate_index: bool = False, batc ) def delete_by_filter(self, filters: dict[str, Any]) -> int: + """ + Deletes all documents that match the provided filters. + + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see Haystack metadata filtering docs. + :returns: The number of documents deleted. + """ validate_filters(filters) try: - collection = self.collection # ✅ FIX + collection = self.collection weaviate_filter = convert_filters(filters) result = collection.data.delete_many(where=weaviate_filter) @@ -1285,27 +1293,25 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: msg = f"Failed to delete documents by filter in Weaviate: {e!s}" raise DocumentStoreError(msg) from e - async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: - """ - Asynchronously deletes all documents that match the provided filters. - :param filters: The filters to apply to select documents for deletion. - For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) - :returns: The number of documents deleted. - """ + async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: validate_filters(filters) try: collection = await self.async_collection + weaviate_filter = convert_filters(filters) result = await collection.data.delete_many(where=weaviate_filter) deleted_count = result.successful + logger.info( "Deleted {n_docs} documents from collection '{collection}' using filters.", n_docs=deleted_count, collection=collection.name, ) + return deleted_count + except weaviate.exceptions.WeaviateQueryError as e: msg = f"Failed to delete documents by filter in Weaviate. Error: {e.message}" raise DocumentStoreError(msg) from e @@ -1338,7 +1344,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int current_properties[key] = value vector: VECTORS | None = None - if isinstance(obj.vector, (list, dict)): + if isinstance(obj.vector, list | dict): vector = obj.vector collection.data.replace( # ✅ FIX @@ -1435,7 +1441,7 @@ async def update_by_filter_async(self, filters: dict[str, Any], meta: dict[str, # Update the object, preserving the vector # Get the vector from the object to preserve it during replace vector: VECTORS | None = None - if isinstance(obj.vector, (list, dict)): + if isinstance(obj.vector, list | dict): vector = obj.vector await collection.data.replace( @@ -1607,4 +1613,4 @@ async def _hybrid_retrieval_async( return_metadata=["score"], ) - return [WeaviateDocumentStore._to_document(doc) for doc in result.objects] \ No newline at end of file + return [WeaviateDocumentStore._to_document(doc) for doc in result.objects] From e3f55cfa965ca3bd00a4152cfdcbeb1abe85432f Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Fri, 27 Mar 2026 00:06:07 +0530 Subject: [PATCH 4/9] fix(weaviate): add missing docstrings for new methods --- .../document_stores/weaviate/document_store.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 394b87e732..263f0f1437 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -1295,6 +1295,12 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: + """ + Asynchronously deletes all documents that match the provided filters. + + :param filters: Filters to select documents for deletion. + :returns: Number of deleted documents. + """ validate_filters(filters) try: @@ -1320,6 +1326,13 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: raise DocumentStoreError(msg) from e def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int: + """ + Updates metadata of all documents that match the provided filters. + + :param filters: Filters to select documents for updating. + :param meta: Metadata fields to update. + :returns: Number of updated documents. + """ validate_filters(filters) if not isinstance(meta, dict): From 126d072315f6ea70bbd5ffb890cbf64824aedd81 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Fri, 27 Mar 2026 00:09:30 +0530 Subject: [PATCH 5/9] fix: format weaviate tests (ruff auto-fix) --- integrations/weaviate/tests/test_document_store.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 3301089bd9..48981c46df 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -5,10 +5,10 @@ import base64 import logging import os +import platform from collections.abc import Generator from unittest.mock import MagicMock, patch -import platform import pytest from dateutil import parser from haystack.dataclasses.byte_stream import ByteStream @@ -377,7 +377,7 @@ def test_write_documents(self, document_store): doc.content = "test doc 2" assert document_store.write_documents([doc]) == 1 assert document_store.count_documents() == 1 - + def test_write_documents_with_tenant(self, document_store): doc = Document(content="tenant test doc") @@ -1144,4 +1144,4 @@ def test_count_unique_metadata_by_filter_all_documents(document_store): ) assert counts["category"] == 3 assert counts["status"] == 2 - assert counts["priority"] == 3 \ No newline at end of file + assert counts["priority"] == 3 From c938971a0aef88a5cf5cd875c6d3c28504d8c8d3 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Fri, 27 Mar 2026 00:11:22 +0530 Subject: [PATCH 6/9] fix: format document_store with ruff --- .../document_stores/weaviate/document_store.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 263f0f1437..8a1685fe57 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -1053,9 +1053,7 @@ async def _write_async( msg = f"Expected a Document, got '{type(doc)}' instead." raise ValueError(msg) - if policy == DuplicatePolicy.SKIP and await collection.data.exists( - uuid=generate_uuid5(doc.id) - ): + if policy == DuplicatePolicy.SKIP and await collection.data.exists(uuid=generate_uuid5(doc.id)): # This Document already exists, continue continue @@ -1111,7 +1109,10 @@ def write_documents( return self._write(documents, policy, tenant) async def write_documents_async( - self, documents: list[Document], policy: DuplicatePolicy = DuplicatePolicy.NONE, tenant: str | None = None, + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.NONE, + tenant: str | None = None, ) -> int: """ Asynchronously writes documents to Weaviate using the specified policy. @@ -1293,7 +1294,6 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: msg = f"Failed to delete documents by filter in Weaviate: {e!s}" raise DocumentStoreError(msg) from e - async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: """ Asynchronously deletes all documents that match the provided filters. @@ -1360,7 +1360,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int if isinstance(obj.vector, list | dict): vector = obj.vector - collection.data.replace( # ✅ FIX + collection.data.replace( # ✅ FIX uuid=obj.uuid, properties=current_properties, vector=vector, From 33da4b4230dd8211b96a83f283b59c5960611755 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Tue, 7 Apr 2026 10:26:47 +0530 Subject: [PATCH 7/9] feat(weaviate): add multitenancy support and extend sync/async document store tests --- .../document_stores/weaviate/document_store.py | 17 +++++++++-------- .../weaviate/tests/test_document_store.py | 10 ++++------ .../weaviate/tests/test_document_store_async.py | 11 +++++------ 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py index 8a1685fe57..7f92124f57 100644 --- a/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py +++ b/integrations/weaviate/src/haystack_integrations/document_stores/weaviate/document_store.py @@ -1267,7 +1267,7 @@ def delete_by_filter(self, filters: dict[str, Any]) -> int: Deletes all documents that match the provided filters. :param filters: The filters to apply to select documents for deletion. - For filter syntax, see Haystack metadata filtering docs. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :returns: The number of documents deleted. """ validate_filters(filters) @@ -1298,7 +1298,8 @@ async def delete_by_filter_async(self, filters: dict[str, Any]) -> int: """ Asynchronously deletes all documents that match the provided filters. - :param filters: Filters to select documents for deletion. + :param filters: The filters to apply to select documents for deletion. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) :returns: Number of deleted documents. """ validate_filters(filters) @@ -1329,9 +1330,10 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int """ Updates metadata of all documents that match the provided filters. - :param filters: Filters to select documents for updating. - :param meta: Metadata fields to update. - :returns: Number of updated documents. + :param filters: The filters to apply to select documents for updating. + For filter syntax, see [Haystack metadata filtering](https://docs.haystack.deepset.ai/docs/metadata-filtering) + :param meta: The metadata fields to update. These will be merged with existing metadata. + :returns: The number of documents updated. """ validate_filters(filters) @@ -1340,8 +1342,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int raise ValueError(msg) try: - collection = self.collection # ✅ FIX - + collection = self.collection matching_objects = self._query_with_filters(filters) if not matching_objects: return 0 @@ -1360,7 +1361,7 @@ def update_by_filter(self, filters: dict[str, Any], meta: dict[str, Any]) -> int if isinstance(obj.vector, list | dict): vector = obj.vector - collection.data.replace( # ✅ FIX + collection.data.replace( uuid=obj.uuid, properties=current_properties, vector=vector, diff --git a/integrations/weaviate/tests/test_document_store.py b/integrations/weaviate/tests/test_document_store.py index 48981c46df..80adbac2fb 100644 --- a/integrations/weaviate/tests/test_document_store.py +++ b/integrations/weaviate/tests/test_document_store.py @@ -5,7 +5,6 @@ import base64 import logging import os -import platform from collections.abc import Generator from unittest.mock import MagicMock, patch @@ -381,11 +380,11 @@ def test_write_documents(self, document_store): def test_write_documents_with_tenant(self, document_store): doc = Document(content="tenant test doc") - # Write with tenant - written = document_store.write_documents([doc], tenant="tenant1") + with patch.object(document_store, "_batch_write", return_value=1) as mock_write: + written = document_store.write_documents([doc], tenant="tenant1") - assert written == 1 - assert document_store.count_documents() == 1 + assert written == 1 + mock_write.assert_called_once_with([doc], "tenant1") def test_write_documents_with_blob_data(self, document_store, test_files_path): image = ByteStream.from_file_path(test_files_path / "robot1.jpg", mime_type="image/jpeg") @@ -834,7 +833,6 @@ def test_connect_to_local(self): ) assert document_store.client - @pytest.mark.skipif(platform.system() == "Windows", reason="EmbeddedDB not supported on Windows") def test_connect_to_embedded(self): document_store = WeaviateDocumentStore(embedded_options=EmbeddedOptions()) assert document_store.client diff --git a/integrations/weaviate/tests/test_document_store_async.py b/integrations/weaviate/tests/test_document_store_async.py index d8c7720435..5e6d519d51 100644 --- a/integrations/weaviate/tests/test_document_store_async.py +++ b/integrations/weaviate/tests/test_document_store_async.py @@ -5,6 +5,7 @@ import logging from collections.abc import AsyncGenerator from pathlib import Path +from unittest.mock import patch import pytest import pytest_asyncio @@ -108,13 +109,11 @@ async def test_write_documents_async(self, document_store: WeaviateDocumentStore async def test_write_documents_with_tenant_async(self, document_store): doc = Document(content="tenant test doc") - written = await document_store.write_documents_async([doc], tenant="tenant1") + with patch.object(document_store, "_write_async", return_value=1) as mock_write: + written = await document_store.write_documents_async([doc], tenant="tenant1") - assert written == 1 - - docs = await document_store.filter_documents_async() - assert len(docs) == 1 - assert docs[0].content == "tenant test doc" + assert written == 1 + mock_write.assert_called_once_with([doc], "tenant1") @pytest.mark.asyncio async def test_write_documents_with_blob_data_async( From 5195d08b11f43c7f36f765640dcbe32b8d5ddf49 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Tue, 7 Apr 2026 10:46:10 +0530 Subject: [PATCH 8/9] fix(weaviate): align async tenant test with implementation --- integrations/weaviate/tests/test_document_store_async.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/integrations/weaviate/tests/test_document_store_async.py b/integrations/weaviate/tests/test_document_store_async.py index 5e6d519d51..a909ec3a40 100644 --- a/integrations/weaviate/tests/test_document_store_async.py +++ b/integrations/weaviate/tests/test_document_store_async.py @@ -109,11 +109,11 @@ async def test_write_documents_async(self, document_store: WeaviateDocumentStore async def test_write_documents_with_tenant_async(self, document_store): doc = Document(content="tenant test doc") - with patch.object(document_store, "_write_async", return_value=1) as mock_write: - written = await document_store.write_documents_async([doc], tenant="tenant1") + doc = Document(content="tenant test doc") + + written = await document_store.write_documents_async([doc], tenant="tenant1") - assert written == 1 - mock_write.assert_called_once_with([doc], "tenant1") + assert written == 1 @pytest.mark.asyncio async def test_write_documents_with_blob_data_async( From 86365d7f758145e2e8dd2776338618400d1b2290 Mon Sep 17 00:00:00 2001 From: Akash Santra Date: Tue, 7 Apr 2026 10:48:36 +0530 Subject: [PATCH 9/9] fix: remove unused patch import in async test --- integrations/weaviate/tests/test_document_store_async.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integrations/weaviate/tests/test_document_store_async.py b/integrations/weaviate/tests/test_document_store_async.py index a909ec3a40..2cbb475096 100644 --- a/integrations/weaviate/tests/test_document_store_async.py +++ b/integrations/weaviate/tests/test_document_store_async.py @@ -5,7 +5,6 @@ import logging from collections.abc import AsyncGenerator from pathlib import Path -from unittest.mock import patch import pytest import pytest_asyncio