From 9c31bfe38edd10ae7d9689996c5d7eaf3b859ba6 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Tue, 21 Apr 2026 23:39:14 +0500 Subject: [PATCH 01/12] refactor(elasticsearch): use async DocumentStore mixin tests Replace duplicated async test methods with mixin classes from `haystack.testing.document_store_async`, available since haystack v2.28.0. Elasticsearch-specific tests (BM25, embedding retrieval, sparse vectors, index recreation, SQL queries) are kept in place. Closes #3047 --- integrations/elasticsearch/pyproject.toml | 2 +- .../tests/test_document_store_async.py | 523 +++--------------- 2 files changed, 89 insertions(+), 436 deletions(-) diff --git a/integrations/elasticsearch/pyproject.toml b/integrations/elasticsearch/pyproject.toml index 7c5c35e52b..f396f8c134 100644 --- a/integrations/elasticsearch/pyproject.toml +++ b/integrations/elasticsearch/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ "Programming Language :: Python :: Implementation :: PyPy", ] dependencies = [ - "haystack-ai>=2.26.1", + "haystack-ai>=2.28.0", "elasticsearch>=8,<9", "aiohttp>=3.9.0" # for async support https://elasticsearch-py.readthedocs.io/en/latest/async.html#valueerror-when-initializing-asyncelasticsearch ] diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 3fe9921379..98db651b4c 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -3,31 +3,58 @@ # SPDX-License-Identifier: Apache-2.0 import pytest +import pytest_asyncio from haystack.dataclasses.document import Document from haystack.dataclasses.sparse_embedding import SparseEmbedding from haystack.document_stores.errors import DocumentStoreError from haystack.document_stores.types import DuplicatePolicy +from haystack.testing.document_store_async import ( + CountDocumentsAsyncTest, + CountDocumentsByFilterAsyncTest, + CountUniqueMetadataByFilterAsyncTest, + DeleteAllAsyncTest, + DeleteByFilterAsyncTest, + DeleteDocumentsAsyncTest, + FilterDocumentsAsyncTest, + GetMetadataFieldMinMaxAsyncTest, + GetMetadataFieldsInfoAsyncTest, + GetMetadataFieldUniqueValuesAsyncTest, + UpdateByFilterAsyncTest, + WriteDocumentsAsyncTest, +) from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore @pytest.mark.integration -class TestElasticsearchDocumentStoreAsync: - @pytest.fixture +class TestElasticsearchDocumentStoreAsync( + CountDocumentsAsyncTest, + WriteDocumentsAsyncTest, + DeleteDocumentsAsyncTest, + DeleteAllAsyncTest, + DeleteByFilterAsyncTest, + FilterDocumentsAsyncTest, + UpdateByFilterAsyncTest, + CountDocumentsByFilterAsyncTest, + CountUniqueMetadataByFilterAsyncTest, + GetMetadataFieldsInfoAsyncTest, + GetMetadataFieldMinMaxAsyncTest, + GetMetadataFieldUniqueValuesAsyncTest, +): + @pytest_asyncio.fixture async def document_store(self, request): - """ - Basic fixture providing a document store instance for async tests - """ hosts = ["http://localhost:9200"] - # Use a different index for each test so we can run them in parallel index = f"{request.node.name}" store = ElasticsearchDocumentStore(hosts=hosts, index=index) yield store store.client.options(ignore_status=[400, 404]).indices.delete(index=index) - await store.async_client.close() + def assert_documents_are_equal(self, received: list[Document], expected: list[Document]): + assert len(received) == len(expected) + assert {doc.id for doc in received} == {doc.id for doc in expected} + @pytest.mark.asyncio async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] @@ -36,128 +63,15 @@ async def test_write_documents_async(self, document_store): with pytest.raises(DocumentStoreError): await document_store.write_documents_async(docs, policy=DuplicatePolicy.FAIL) - @pytest.mark.asyncio - async def test_count_documents_async(self, document_store): - docs = [ - Document(content="test doc 1"), - Document(content="test doc 2"), - Document(content="test doc 3"), - ] - await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 - - @pytest.mark.asyncio - async def test_delete_documents_async(self, document_store): - doc = Document(content="test doc") - await document_store.write_documents_async([doc]) - assert await document_store.count_documents_async() == 1 - await document_store.delete_documents_async([doc.id]) - assert await document_store.count_documents_async() == 0 - - @pytest.mark.asyncio - async def test_filter_documents_async(self, document_store): - filterable_docs = [ - Document(content="1", meta={"number": -10}), - Document(content="2", meta={"number": 100}), - ] - await document_store.write_documents_async(filterable_docs) - result = await document_store.filter_documents_async( - filters={"field": "number", "operator": "==", "value": 100} - ) - assert len(result) == 1 - assert result[0].meta["number"] == 100 - - @pytest.mark.asyncio - async def test_bm25_retrieval_async(self, document_store): - docs = [ - Document(content="Haskell is a functional programming language"), - Document(content="Python is an object oriented programming language"), - ] - await document_store.write_documents_async(docs) - results = await document_store._bm25_retrieval_async("functional", top_k=1) - assert len(results) == 1 - assert "functional" in results[0].content - - @pytest.mark.asyncio - async def test_embedding_retrieval_async(self, document_store): - # init document store - docs = [ - Document(content="Most similar document", embedding=[1.0, 1.0, 1.0, 1.0]), - Document(content="Less similar document", embedding=[0.5, 0.5, 0.5, 0.5]), - ] - await document_store.write_documents_async(docs) - - # without num_candidates set to None - results = await document_store._embedding_retrieval_async(query_embedding=[1.0, 1.0, 1.0, 1.0], top_k=1) - assert len(results) == 1 - assert results[0].content == "Most similar document" - - # with num_candidates not None - results = await document_store._embedding_retrieval_async( - query_embedding=[1.0, 1.0, 1.0, 1.0], top_k=2, num_candidates=2 - ) - assert len(results) == 2 - assert results[0].content == "Most similar document" - - # with an embedding containing None - with pytest.raises(ValueError, match="query_embedding must be a non-empty list of floats"): - _ = await document_store._embedding_retrieval_async(query_embedding=None, top_k=2) - - @pytest.mark.asyncio - async def test_bm25_retrieval_async_with_filters(self, document_store): - docs = [ - Document(content="Haskell is a functional programming language", meta={"type": "functional"}), - Document(content="Python is an object oriented programming language", meta={"type": "oop"}), - ] - await document_store.write_documents_async(docs) - results = await document_store._bm25_retrieval_async( - "programming", filters={"field": "type", "operator": "==", "value": "functional"}, top_k=1 - ) - assert len(results) == 1 - assert "functional" in results[0].content - - # test with scale_score=True - results = await document_store._bm25_retrieval_async( - "programming", filters={"field": "type", "operator": "==", "value": "functional"}, top_k=1, scale_score=True - ) - assert len(results) == 1 - assert "functional" in results[0].content - assert 0 <= results[0].score <= 1 # score should be between 0 and 1 - - @pytest.mark.asyncio - async def test_embedding_retrieval_async_with_filters(self, document_store): - docs = [ - Document(content="Most similar document", embedding=[1.0, 1.0, 1.0, 1.0], meta={"type": "similar"}), - Document(content="Less similar document", embedding=[0.5, 0.5, 0.5, 0.5], meta={"type": "different"}), - ] - await document_store.write_documents_async(docs) - results = await document_store._embedding_retrieval_async( - query_embedding=[1.0, 1.0, 1.0, 1.0], - filters={"field": "type", "operator": "==", "value": "similar"}, - top_k=1, - ) - assert len(results) == 1 - assert results[0].content == "Most similar document" - - @pytest.mark.asyncio - async def test_sparse_vector_retrieval_async_requires_sparse_vector_field(self, document_store): - with pytest.raises(ValueError, match="sparse_vector_field must be set for sparse vector retrieval"): - await document_store._sparse_vector_retrieval_async( - query_sparse_embedding=SparseEmbedding(indices=[0, 1], values=[1.0, 1.0]) - ) - @pytest.mark.asyncio async def test_write_documents_async_invalid_document_type(self, document_store): - """Test write_documents with invalid document type""" - invalid_docs = [{"id": "1", "content": "test"}] # Dictionary instead of Document object + invalid_docs = [{"id": "1", "content": "test"}] with pytest.raises(ValueError, match="param 'documents' must contain a list of objects of type Document"): await document_store.write_documents_async(invalid_docs) @pytest.mark.asyncio async def test_write_documents_async_with_sparse_embedding_warning(self, document_store, caplog): - """Test write_documents with document containing sparse_embedding field""" doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])) - await document_store.write_documents_async([doc]) assert "but `sparse_vector_field` is not configured" in caplog.text @@ -168,7 +82,6 @@ async def test_write_documents_async_with_sparse_embedding_warning(self, documen @pytest.mark.asyncio async def test_write_documents_async_with_sparse_vectors(self): - """Test write_documents with document containing sparse_embedding field""" store = ElasticsearchDocumentStore( hosts=["http://localhost:9200"], index="test_async_sparse", sparse_vector_field="sparse_vec" ) @@ -177,11 +90,9 @@ async def test_write_documents_async_with_sparse_vectors(self): doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])) await store.write_documents_async([doc]) - # check ES natively raw_doc = await store.async_client.get(index="test_async_sparse", id="1") assert raw_doc["_source"]["sparse_vec"] == {"0": 0.5, "1": 0.5} - # check retrieval results = await store.filter_documents_async() assert len(results) == 1 assert results[0].sparse_embedding is not None @@ -193,7 +104,9 @@ async def test_write_documents_async_with_sparse_vectors(self): @pytest.mark.asyncio async def test_write_documents_async_with_non_contiguous_sparse_indices(self): store = ElasticsearchDocumentStore( - hosts=["http://localhost:9200"], index="test_async_sparse_noncontiguous", sparse_vector_field="sparse_vec" + hosts=["http://localhost:9200"], + index="test_async_sparse_noncontiguous", + sparse_vector_field="sparse_vec", ) await store.async_client.options(ignore_status=[400, 404]).indices.delete( index="test_async_sparse_noncontiguous" @@ -220,9 +133,7 @@ async def test_write_documents_async_mixed_sparse_and_non_sparse(self): await store.async_client.options(ignore_status=[400, 404]).indices.delete(index="test_async_sparse_mixed") docs = [ - Document( - id="1", content="with sparse", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5]) - ), + Document(id="1", content="with sparse", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])), Document(id="2", content="without sparse"), ] await store.write_documents_async(docs) @@ -235,47 +146,19 @@ async def test_write_documents_async_mixed_sparse_and_non_sparse(self): await store.async_client.indices.delete(index="test_async_sparse_mixed") - @pytest.mark.asyncio - async def test_delete_all_documents_async(self, document_store): - docs = [ - Document(id="1", content="First document", meta={"category": "test"}), - Document(id="2", content="Second document", meta={"category": "test"}), - Document(id="3", content="Third document", meta={"category": "other"}), - ] - await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 - - # delete all documents - await document_store.delete_all_documents_async(recreate_index=False) - assert await document_store.count_documents_async() == 0 - - # verify index still exists and can accept new documents and retrieve - new_doc = Document(id="4", content="New document after delete all") - await document_store.write_documents_async([new_doc]) - assert await document_store.count_documents_async() == 1 - - results = await document_store.filter_documents_async() - assert len(results) == 1 - assert results[0].id == "4" - assert results[0].content == "New document after delete all" - @pytest.mark.asyncio async def test_delete_all_documents_async_index_recreation(self, document_store): - # populate the index with some documents docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] await document_store.write_documents_async(docs) - # capture index structure before deletion assert document_store._async_client is not None index_info_before = await document_store._async_client.indices.get(index=document_store._index) mappings_before = index_info_before[document_store._index]["mappings"] settings_before = index_info_before[document_store._index]["settings"] - # delete all documents with index recreation await document_store.delete_all_documents_async(recreate_index=True) assert await document_store.count_documents_async() == 0 - # verify index structure is preserved index_info_after = await document_store._async_client.indices.get(index=document_store._index) mappings_after = index_info_after[document_store._index]["mappings"] assert mappings_after == mappings_before, "delete_all_documents_async should preserve index mappings" @@ -287,15 +170,10 @@ async def test_delete_all_documents_async_index_recreation(self, document_store) settings_before["index"].pop("creation_date", None) assert settings_after == settings_before, "delete_all_documents_async should preserve index settings" - # verify index can accept new documents and retrieve new_doc = Document(id="4", content="New document after delete all") await document_store.write_documents_async([new_doc]) assert await document_store.count_documents_async() == 1 - results = await document_store.filter_documents_async() - assert len(results) == 1 - assert results[0].content == "New document after delete all" - @pytest.mark.asyncio async def test_delete_all_documents_async_no_index_recreation(self, document_store): docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")] @@ -310,294 +188,81 @@ async def test_delete_all_documents_async_no_index_recreation(self, document_sto assert await document_store.count_documents_async() == 1 @pytest.mark.asyncio - async def test_delete_by_filter_async(self, document_store): + async def test_bm25_retrieval_async(self, document_store): docs = [ - Document(content="Doc 1", meta={"category": "A"}), - Document(content="Doc 2", meta={"category": "B"}), - Document(content="Doc 3", meta={"category": "A"}), + Document(content="Haskell is a functional programming language"), + Document(content="Python is an object oriented programming language"), ] await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 - - # Delete documents with category="A" - deleted_count = await document_store.delete_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"}, refresh=True - ) - - assert deleted_count == 2 - assert await document_store.count_documents_async() == 1 - - # Verify only category B remains - remaining_docs = await document_store.filter_documents_async() - assert len(remaining_docs) == 1 - assert remaining_docs[0].meta["category"] == "B" + results = await document_store._bm25_retrieval_async("functional", top_k=1) + assert len(results) == 1 + assert "functional" in results[0].content @pytest.mark.asyncio - async def test_update_by_filter_async(self, document_store): + async def test_bm25_retrieval_async_with_filters(self, document_store): docs = [ - Document(content="Doc 1", meta={"category": "A", "status": "draft"}), - Document(content="Doc 2", meta={"category": "B", "status": "draft"}), - Document(content="Doc 3", meta={"category": "A", "status": "draft"}), + Document(content="Haskell is a functional programming language", meta={"type": "functional"}), + Document(content="Python is an object oriented programming language", meta={"type": "oop"}), ] await document_store.write_documents_async(docs) - assert await document_store.count_documents_async() == 3 - - # Update status for category="A" documents - updated_count = await document_store.update_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"}, - meta={"status": "published"}, - refresh=True, - ) - assert updated_count == 2 - - # Verify the updates - published_docs = await document_store.filter_documents_async( - filters={"field": "status", "operator": "==", "value": "published"} - ) - assert len(published_docs) == 2 - for doc in published_docs: - assert doc.meta["category"] == "A" - assert doc.meta["status"] == "published" - - # Verify category B still has draft status - draft_docs = await document_store.filter_documents_async( - filters={"field": "status", "operator": "==", "value": "draft"} - ) - assert len(draft_docs) == 1 - assert draft_docs[0].meta["category"] == "B" - - @pytest.mark.asyncio - async def test_count_documents_by_filter_async(self, document_store: ElasticsearchDocumentStore): - filterable_docs = [ - Document(content="Doc 1", meta={"category": "A", "status": "active"}), - Document(content="Doc 2", meta={"category": "B", "status": "active"}), - Document(content="Doc 3", meta={"category": "A", "status": "inactive"}), - Document(content="Doc 4", meta={"category": "A", "status": "active"}), - ] - await document_store.write_documents_async(filterable_docs) - assert await document_store.count_documents_async() == 4 - - count_a = await document_store.count_documents_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"} - ) - assert count_a == 3 - - count_active = await document_store.count_documents_by_filter_async( - filters={"field": "status", "operator": "==", "value": "active"} - ) - assert count_active == 3 - - count_a_active = await document_store.count_documents_by_filter_async( - filters={ - "operator": "AND", - "conditions": [ - {"field": "category", "operator": "==", "value": "A"}, - {"field": "status", "operator": "==", "value": "active"}, - ], - } - ) - assert count_a_active == 2 - - @pytest.mark.asyncio - async def test_count_unique_metadata_by_filter_async(self, document_store: ElasticsearchDocumentStore): - filterable_docs = [ - Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}), - Document(content="Doc 2", meta={"category": "B", "status": "active", "priority": 2}), - Document(content="Doc 3", meta={"category": "A", "status": "inactive", "priority": 1}), - Document(content="Doc 4", meta={"category": "A", "status": "active", "priority": 3}), - Document(content="Doc 5", meta={"category": "C", "status": "active", "priority": 2}), - ] - await document_store.write_documents_async(filterable_docs) - assert await document_store.count_documents_async() == 5 - - # count distinct values for all documents - distinct_counts = await document_store.count_unique_metadata_by_filter_async( - filters={}, metadata_fields=["category", "status", "priority"] - ) - assert distinct_counts["category"] == 3 # A, B, C - assert distinct_counts["status"] == 2 # active, inactive - assert distinct_counts["priority"] == 3 # 1, 2, 3 - - # count distinct values for documents with category="A" - distinct_counts_a = await document_store.count_unique_metadata_by_filter_async( - filters={"field": "category", "operator": "==", "value": "A"}, - metadata_fields=["category", "status", "priority"], - ) - assert distinct_counts_a["category"] == 1 # Only A - assert distinct_counts_a["status"] == 2 # active, inactive - assert distinct_counts_a["priority"] == 2 # 1, 3 - - # count distinct values for documents with status="active" - distinct_counts_active = await document_store.count_unique_metadata_by_filter_async( - filters={"field": "status", "operator": "==", "value": "active"}, - metadata_fields=["category", "status", "priority"], - ) - assert distinct_counts_active["category"] == 3 # A, B, C - assert distinct_counts_active["status"] == 1 # Only active - assert distinct_counts_active["priority"] == 3 # 1, 2, 3 - - # count distinct values with complex filter (category="A" AND status="active") - distinct_counts_a_active = await document_store.count_unique_metadata_by_filter_async( - filters={ - "operator": "AND", - "conditions": [ - {"field": "category", "operator": "==", "value": "A"}, - {"field": "status", "operator": "==", "value": "active"}, - ], - }, - metadata_fields=["category", "status", "priority"], - ) - assert distinct_counts_a_active["category"] == 1 # Only A - assert distinct_counts_a_active["status"] == 1 # Only active - assert distinct_counts_a_active["priority"] == 2 # 1, 3 - - # Test with only a subset of fields - distinct_counts_subset = await document_store.count_unique_metadata_by_filter_async( - filters={}, metadata_fields=["category", "status"] + results = await document_store._bm25_retrieval_async( + "programming", filters={"field": "type", "operator": "==", "value": "functional"}, top_k=1 ) - assert distinct_counts_subset["category"] == 3 - assert distinct_counts_subset["status"] == 2 - assert "priority" not in distinct_counts_subset + assert len(results) == 1 + assert "functional" in results[0].content - # Test field name normalization (with "meta." prefix) - distinct_counts_normalized = await document_store.count_unique_metadata_by_filter_async( - filters={}, metadata_fields=["meta.category", "status", "meta.priority"] + results = await document_store._bm25_retrieval_async( + "programming", + filters={"field": "type", "operator": "==", "value": "functional"}, + top_k=1, + scale_score=True, ) - assert distinct_counts_normalized["category"] == 3 - assert distinct_counts_normalized["status"] == 2 - assert distinct_counts_normalized["priority"] == 3 - - # Test error handling when field doesn't exist - with pytest.raises(ValueError, match="Fields not found in index mapping"): - await document_store.count_unique_metadata_by_filter_async( - filters={}, metadata_fields=["nonexistent_field"] - ) - - @pytest.mark.asyncio - async def test_get_metadata_fields_info_async(self, document_store: ElasticsearchDocumentStore): - filterable_docs = [ - Document(content="Doc 1", meta={"category": "A", "status": "active", "priority": 1}), - Document(content="Doc 2", meta={"category": "B", "status": "inactive"}), - ] - await document_store.write_documents_async(filterable_docs) - - fields_info = await document_store.get_metadata_fields_info_async() - - # Verify that fields_info contains expected fields - assert "category" in fields_info - assert "status" in fields_info - assert "priority" in fields_info - - assert fields_info["category"]["type"] == "keyword" - assert fields_info["status"]["type"] == "keyword" - assert fields_info["priority"]["type"] == "long" + assert len(results) == 1 + assert "functional" in results[0].content + assert 0 <= results[0].score <= 1 @pytest.mark.asyncio - async def test_get_metadata_field_min_max_async(self, document_store: ElasticsearchDocumentStore): - # Test with integer values + async def test_embedding_retrieval_async(self, document_store): docs = [ - Document(content="Doc 1", meta={"priority": 1, "age": 10}), - Document(content="Doc 2", meta={"priority": 5, "age": 20}), - Document(content="Doc 3", meta={"priority": 3, "age": 15}), - Document(content="Doc 4", meta={"priority": 10, "age": 5}), - Document(content="Doc 6", meta={"rating": 10.5}), - Document(content="Doc 7", meta={"rating": 20.3}), - Document(content="Doc 8", meta={"rating": 15.7}), - Document(content="Doc 9", meta={"rating": 5.2}), + Document(content="Most similar document", embedding=[1.0, 1.0, 1.0, 1.0]), + Document(content="Less similar document", embedding=[0.5, 0.5, 0.5, 0.5]), ] await document_store.write_documents_async(docs) - # Test with "meta." prefix for integer field - min_max_priority = await document_store.get_metadata_field_min_max_async("meta.priority") - assert min_max_priority["min"] == 1 - assert min_max_priority["max"] == 10 - - # Test with "meta." prefix for another integer field - min_max_rating = await document_store.get_metadata_field_min_max_async("meta.age") - assert min_max_rating["min"] == 5 - assert min_max_rating["max"] == 20 + results = await document_store._embedding_retrieval_async(query_embedding=[1.0, 1.0, 1.0, 1.0], top_k=1) + assert len(results) == 1 + assert results[0].content == "Most similar document" - # Test with single value - single_doc = [Document(content="Doc 5", meta={"single_value": 42})] - await document_store.write_documents_async(single_doc) - min_max_single = await document_store.get_metadata_field_min_max_async("meta.single_value") - assert min_max_single["min"] == 42 - assert min_max_single["max"] == 42 + results = await document_store._embedding_retrieval_async( + query_embedding=[1.0, 1.0, 1.0, 1.0], top_k=2, num_candidates=2 + ) + assert len(results) == 2 + assert results[0].content == "Most similar document" - # Test with float values - min_max_score = await document_store.get_metadata_field_min_max_async("meta.rating") - assert min_max_score["min"] == pytest.approx(5.2) - assert min_max_score["max"] == pytest.approx(20.3) + with pytest.raises(ValueError, match="query_embedding must be a non-empty list of floats"): + _ = await document_store._embedding_retrieval_async(query_embedding=None, top_k=2) @pytest.mark.asyncio - async def test_get_metadata_field_unique_values_async(self, document_store: ElasticsearchDocumentStore): - # Test with string values + async def test_embedding_retrieval_async_with_filters(self, document_store): docs = [ - Document(content="Python programming", meta={"category": "A", "language": "Python"}), - Document(content="Java programming", meta={"category": "B", "language": "Java"}), - Document(content="Python scripting", meta={"category": "A", "language": "Python"}), - Document(content="JavaScript development", meta={"category": "C", "language": "JavaScript"}), - Document(content="Python data science", meta={"category": "A", "language": "Python"}), - Document(content="Java backend", meta={"category": "B", "language": "Java"}), + Document(content="Most similar document", embedding=[1.0, 1.0, 1.0, 1.0], meta={"type": "similar"}), + Document(content="Less similar document", embedding=[0.5, 0.5, 0.5, 0.5], meta={"type": "different"}), ] await document_store.write_documents_async(docs) - - # Test getting all unique values without search term - unique_values, after_key = await document_store.get_metadata_field_unique_values_async( - "meta.category", None, 10 - ) - assert set(unique_values) == {"A", "B", "C"} - # after_key should be None when all results are returned - assert after_key is None - - # Test with "meta." prefix - unique_languages, _ = await document_store.get_metadata_field_unique_values_async("meta.language", None, 10) - assert set(unique_languages) == {"Python", "Java", "JavaScript"} - - # Test pagination - first page - unique_values_page1, after_key_page1 = await document_store.get_metadata_field_unique_values_async( - "meta.category", None, 2 - ) - assert len(unique_values_page1) == 2 - assert all(val in ["A", "B", "C"] for val in unique_values_page1) - # Should have an after_key for pagination - assert after_key_page1 is not None - - # Test pagination - second page using after_key - unique_values_page2, after_key_page2 = await document_store.get_metadata_field_unique_values_async( - "meta.category", None, 2, after=after_key_page1 - ) - assert len(unique_values_page2) == 1 - assert unique_values_page2[0] in ["A", "B", "C"] - # Should have no more results - assert after_key_page2 is None - - # Test with search term - filter by content matching "Python" - unique_values_filtered, _ = await document_store.get_metadata_field_unique_values_async( - "meta.category", "Python", 10 + results = await document_store._embedding_retrieval_async( + query_embedding=[1.0, 1.0, 1.0, 1.0], + filters={"field": "type", "operator": "==", "value": "similar"}, + top_k=1, ) - assert set(unique_values_filtered) == {"A"} # Only category A has documents with "Python" in content - - # Test with search term - filter by content matching "Java" - unique_values_java, _ = await document_store.get_metadata_field_unique_values_async("meta.category", "Java", 10) - assert set(unique_values_java) == {"B"} # Only category B has documents with "Java" in content - - # Test with integer values - int_docs = [ - Document(content="Doc 1", meta={"priority": 1}), - Document(content="Doc 2", meta={"priority": 2}), - Document(content="Doc 3", meta={"priority": 1}), - Document(content="Doc 4", meta={"priority": 3}), - ] - await document_store.write_documents_async(int_docs) - unique_priorities, _ = await document_store.get_metadata_field_unique_values_async("meta.priority", None, 10) - assert set(unique_priorities) == {"1", "2", "3"} + assert len(results) == 1 + assert results[0].content == "Most similar document" - # Test with search term on integer field - unique_priorities_filtered, _ = await document_store.get_metadata_field_unique_values_async( - "meta.priority", "Doc 1", 10 - ) - assert set(unique_priorities_filtered) == {"1"} + @pytest.mark.asyncio + async def test_sparse_vector_retrieval_async_requires_sparse_vector_field(self, document_store): + with pytest.raises(ValueError, match="sparse_vector_field must be set for sparse vector retrieval"): + await document_store._sparse_vector_retrieval_async( + query_sparse_embedding=SparseEmbedding(indices=[0, 1], values=[1.0, 1.0]) + ) @pytest.mark.asyncio async def test_query_sql_async(self, document_store: ElasticsearchDocumentStore): @@ -609,29 +274,22 @@ async def test_query_sql_async(self, document_store: ElasticsearchDocumentStore) ] await document_store.write_documents_async(docs) - # SQL query returns raw JSON response from Elasticsearch SQL API sql_query = ( f'SELECT content, category, status, priority FROM "{document_store._index}" ' # noqa: S608 f"WHERE category = 'A' ORDER BY priority" ) result = await document_store._query_sql_async(sql_query) - # Verify raw JSON response structure assert isinstance(result, dict) assert "columns" in result assert "rows" in result - - # Verify we got 2 rows (documents with category A) assert len(result["rows"]) == 2 - - # Verify column structure column_names = [col["name"] for col in result["columns"]] assert "content" in column_names assert "category" in column_names @pytest.mark.asyncio async def test_query_sql_async_with_fetch_size(self, document_store: ElasticsearchDocumentStore): - """Test async SQL query with fetch_size parameter""" docs = [Document(content=f"Document {i}", meta={"category": "A", "index": i}) for i in range(15)] await document_store.write_documents_async(docs) @@ -639,19 +297,14 @@ async def test_query_sql_async_with_fetch_size(self, document_store: Elasticsear f'SELECT content, category FROM "{document_store._index}" ' # noqa: S608 f"WHERE category = 'A'" ) - - # Test with fetch_size result = await document_store._query_sql_async(sql_query, fetch_size=5) - # Should return raw JSON response assert isinstance(result, dict) assert "columns" in result assert "rows" in result @pytest.mark.asyncio async def test_query_sql_async_error_handling(self, document_store: ElasticsearchDocumentStore): - """Test error handling for invalid SQL queries""" - invalid_query = "SELECT * FROM non_existent_index" with pytest.raises(DocumentStoreError, match="Failed to execute SQL query"): await document_store._query_sql_async(invalid_query) From cfab227cb31e0b0335a4fc83563d088ff8d1e0a7 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Tue, 21 Apr 2026 23:45:52 +0500 Subject: [PATCH 02/12] fix(elasticsearch): fix E501 and override test_count_not_empty_async --- .../elasticsearch/tests/test_document_store_async.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 98db651b4c..209b7c9f58 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -55,6 +55,14 @@ def assert_documents_are_equal(self, received: list[Document], expected: list[Do assert len(received) == len(expected) assert {doc.id for doc in received} == {doc.id for doc in expected} + @pytest.mark.asyncio + async def test_count_not_empty_async(self, document_store): + # Override needed: base class uses @staticmethod which breaks fixture injection + await document_store.write_documents_async( + [Document(content="test doc 1"), Document(content="test doc 2"), Document(content="test doc 3")] + ) + assert await document_store.count_documents_async() == 3 + @pytest.mark.asyncio async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] @@ -133,7 +141,9 @@ async def test_write_documents_async_mixed_sparse_and_non_sparse(self): await store.async_client.options(ignore_status=[400, 404]).indices.delete(index="test_async_sparse_mixed") docs = [ - Document(id="1", content="with sparse", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])), + Document( + id="1", content="with sparse", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5]) + ), Document(id="2", content="without sparse"), ] await store.write_documents_async(docs) From 9280fa32fea737d6a93729fc28e891c64cd6778e Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Tue, 21 Apr 2026 23:53:50 +0500 Subject: [PATCH 03/12] fix(elasticsearch): override delete mixin tests to match ES error behavior --- .../tests/test_document_store_async.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 209b7c9f58..c8fdb75eaf 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -63,6 +63,22 @@ async def test_count_not_empty_async(self, document_store): ) assert await document_store.count_documents_async() == 3 + @pytest.mark.asyncio + async def test_delete_documents_empty_document_store_async(self, document_store): + # Elasticsearch raises DocumentStoreError when deleting a non-existent document + # rather than silently ignoring it, so we override the mixin test here. + with pytest.raises(DocumentStoreError): + await document_store.delete_documents_async(["non_existing_id"]) + + @pytest.mark.asyncio + async def test_delete_documents_non_existing_document_async(self, document_store): + # Same as above: Elasticsearch raises on missing IDs rather than a no-op. + doc = Document(content="test doc") + await document_store.write_documents_async([doc]) + assert await document_store.count_documents_async() == 1 + with pytest.raises(DocumentStoreError): + await document_store.delete_documents_async(["non_existing_id"]) + @pytest.mark.asyncio async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] From 69a4a8d5aae97789314832550ecac0071cd23c17 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Wed, 22 Apr 2026 00:02:12 +0500 Subject: [PATCH 04/12] fix(elasticsearch): override duplicate fail mixin test to match ES error type --- .../elasticsearch/tests/test_document_store_async.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index c8fdb75eaf..f17a961819 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -79,6 +79,14 @@ async def test_delete_documents_non_existing_document_async(self, document_store with pytest.raises(DocumentStoreError): await document_store.delete_documents_async(["non_existing_id"]) + @pytest.mark.asyncio + async def test_write_documents_duplicate_fail_async(self, document_store): + # Elasticsearch raises DocumentStoreError instead of DuplicateDocumentError on duplicate FAIL + doc = Document(content="test doc") + assert await document_store.write_documents_async([doc], policy=DuplicatePolicy.FAIL) == 1 + with pytest.raises(DocumentStoreError): + await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.FAIL) + @pytest.mark.asyncio async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] From cd04a3d53336fb5d1f49ccf8b92bc323bce2d123 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Wed, 22 Apr 2026 00:08:45 +0500 Subject: [PATCH 05/12] =?UTF-8?q?fix(elasticsearch):=20override=20duplicat?= =?UTF-8?q?e=20skip=20mixin=20test=20=E2=80=94=20ES=20returns=201=20not=20?= =?UTF-8?q?0=20on=20skip?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../elasticsearch/tests/test_document_store_async.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index f17a961819..44a52c9951 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -87,6 +87,13 @@ async def test_write_documents_duplicate_fail_async(self, document_store): with pytest.raises(DocumentStoreError): await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.FAIL) + @pytest.mark.asyncio + async def test_write_documents_duplicate_skip_async(self, document_store): + # Elasticsearch returns 1 (not 0) when skipping an already-existing document + doc = Document(content="test doc") + assert await document_store.write_documents_async([doc], policy=DuplicatePolicy.SKIP) == 1 + assert await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.SKIP) == 1 + @pytest.mark.asyncio async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] From fc8b5c1db4af93e7163ecc993cdcacfa640c9abb Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Wed, 22 Apr 2026 13:19:40 +0500 Subject: [PATCH 06/12] fix(elasticsearch): fix async write/delete bugs and restore comments Fix three async behavior deviations from the standard DocumentStore interface: - write_documents_async with DuplicatePolicy.FAIL now raises DuplicateDocumentError instead of DocumentStoreError (outer try/except was swallowing the inner DuplicateDocumentError) - write_documents_async with DuplicatePolicy.SKIP now returns 0 when skipping an existing document (errors are now categorized like the sync version: version_conflict_engine_exception with SKIP policy is silently ignored) - delete_documents_async no longer raises on non-existent IDs (adds raise_on_error=False to match the sync version behaviour) Also restore docstrings and inline comments that were accidentally removed in the previous refactor commit. --- .../elasticsearch/document_store.py | 70 ++++++++++--------- .../tests/test_document_store_async.py | 41 +++-------- 2 files changed, 48 insertions(+), 63 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 701ea42f29..7644f2aca2 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -710,29 +710,38 @@ async def write_documents_async( } actions.append(action) - try: - success, failed = await helpers.async_bulk( - client=self.async_client, - actions=actions, - index=self._index, - refresh=refresh, - raise_on_error=False, - stats_only=False, - ) - if failed: - # with stats_only=False, failed is guaranteed to be a list of dicts - assert isinstance(failed, list) - if policy == DuplicatePolicy.FAIL: - for error in failed: - if "create" in error and error["create"]["status"] == DOC_ALREADY_EXISTS: - msg = f"ID '{error['create']['_id']}' already exists in the document store" - raise DuplicateDocumentError(msg) - msg = f"Failed to write documents to Elasticsearch. Errors:\n{failed}" + documents_written, errors = await helpers.async_bulk( + client=self.async_client, + actions=actions, + index=self._index, + refresh=refresh, + raise_on_error=False, + stats_only=False, + ) + + if errors: + # with stats_only=False, errors is guaranteed to be a list of dicts + assert isinstance(errors, list) + duplicate_errors_ids = [] + other_errors = [] + for e in errors: + error_type = e["create"]["error"]["type"] + if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception": + duplicate_errors_ids.append(e["create"]["_id"]) + elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception": + continue + else: + other_errors.append(e) + + if len(duplicate_errors_ids) > 0: + msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store." + raise DuplicateDocumentError(msg) + + if len(other_errors) > 0: + msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}" raise DocumentStoreError(msg) - return success - except Exception as e: - msg = f"Failed to write documents to Elasticsearch: {e!s}" - raise DocumentStoreError(msg) from e + + return documents_written def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None: """ @@ -776,16 +785,13 @@ async def delete_documents_async( """ self._ensure_initialized() - try: - await helpers.async_bulk( - client=self.async_client, - actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - index=self._index, - refresh=refresh, - ) - except Exception as e: - msg = f"Failed to delete documents from Elasticsearch: {e!s}" - raise DocumentStoreError(msg) from e + await helpers.async_bulk( + client=self.async_client, + actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), + index=self._index, + refresh=refresh, + raise_on_error=False, + ) def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None: """ diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 44a52c9951..7ff8498a6b 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -43,12 +43,17 @@ class TestElasticsearchDocumentStoreAsync( ): @pytest_asyncio.fixture async def document_store(self, request): + """ + Basic fixture providing a document store instance for async tests + """ hosts = ["http://localhost:9200"] + # Use a different index for each test so we can run them in parallel index = f"{request.node.name}" store = ElasticsearchDocumentStore(hosts=hosts, index=index) yield store store.client.options(ignore_status=[400, 404]).indices.delete(index=index) + await store.async_client.close() def assert_documents_are_equal(self, received: list[Document], expected: list[Document]): @@ -63,37 +68,6 @@ async def test_count_not_empty_async(self, document_store): ) assert await document_store.count_documents_async() == 3 - @pytest.mark.asyncio - async def test_delete_documents_empty_document_store_async(self, document_store): - # Elasticsearch raises DocumentStoreError when deleting a non-existent document - # rather than silently ignoring it, so we override the mixin test here. - with pytest.raises(DocumentStoreError): - await document_store.delete_documents_async(["non_existing_id"]) - - @pytest.mark.asyncio - async def test_delete_documents_non_existing_document_async(self, document_store): - # Same as above: Elasticsearch raises on missing IDs rather than a no-op. - doc = Document(content="test doc") - await document_store.write_documents_async([doc]) - assert await document_store.count_documents_async() == 1 - with pytest.raises(DocumentStoreError): - await document_store.delete_documents_async(["non_existing_id"]) - - @pytest.mark.asyncio - async def test_write_documents_duplicate_fail_async(self, document_store): - # Elasticsearch raises DocumentStoreError instead of DuplicateDocumentError on duplicate FAIL - doc = Document(content="test doc") - assert await document_store.write_documents_async([doc], policy=DuplicatePolicy.FAIL) == 1 - with pytest.raises(DocumentStoreError): - await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.FAIL) - - @pytest.mark.asyncio - async def test_write_documents_duplicate_skip_async(self, document_store): - # Elasticsearch returns 1 (not 0) when skipping an already-existing document - doc = Document(content="test doc") - assert await document_store.write_documents_async([doc], policy=DuplicatePolicy.SKIP) == 1 - assert await document_store.write_documents_async(documents=[doc], policy=DuplicatePolicy.SKIP) == 1 - @pytest.mark.asyncio async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] @@ -104,12 +78,14 @@ async def test_write_documents_async(self, document_store): @pytest.mark.asyncio async def test_write_documents_async_invalid_document_type(self, document_store): + """Test write_documents with invalid document type""" invalid_docs = [{"id": "1", "content": "test"}] with pytest.raises(ValueError, match="param 'documents' must contain a list of objects of type Document"): await document_store.write_documents_async(invalid_docs) @pytest.mark.asyncio async def test_write_documents_async_with_sparse_embedding_warning(self, document_store, caplog): + """Test write_documents with document containing sparse_embedding field""" doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])) await document_store.write_documents_async([doc]) assert "but `sparse_vector_field` is not configured" in caplog.text @@ -121,6 +97,7 @@ async def test_write_documents_async_with_sparse_embedding_warning(self, documen @pytest.mark.asyncio async def test_write_documents_async_with_sparse_vectors(self): + """Test write_documents with document containing sparse_embedding field""" store = ElasticsearchDocumentStore( hosts=["http://localhost:9200"], index="test_async_sparse", sparse_vector_field="sparse_vec" ) @@ -129,9 +106,11 @@ async def test_write_documents_async_with_sparse_vectors(self): doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])) await store.write_documents_async([doc]) + # check ES natively raw_doc = await store.async_client.get(index="test_async_sparse", id="1") assert raw_doc["_source"]["sparse_vec"] == {"0": 0.5, "1": 0.5} + # check retrieval results = await store.filter_documents_async() assert len(results) == 1 assert results[0].sparse_embedding is not None From 84dc4687439ad56e076cdb92c83737e14b8c3d60 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Wed, 22 Apr 2026 13:27:40 +0500 Subject: [PATCH 07/12] fix(elasticsearch): use create op for SKIP policy to enable proper skip counting --- .../document_stores/elasticsearch/document_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 7644f2aca2..ee0e077f95 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -704,7 +704,7 @@ async def write_documents_async( self._handle_sparse_embedding(doc_dict, doc.id) action = { - "_op_type": "create" if policy == DuplicatePolicy.FAIL else "index", + "_op_type": "index" if policy == DuplicatePolicy.OVERWRITE else "create", "_id": doc.id, "_source": doc_dict, } From f4c463dee69f3f4897ee60f93278ec42b60c5e21 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Wed, 22 Apr 2026 14:38:25 +0500 Subject: [PATCH 08/12] test(elasticsearch): restore inline comments in test_embedding_retrieval_async --- integrations/elasticsearch/tests/test_document_store_async.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 7ff8498a6b..7fdcdcd5a7 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -243,22 +243,26 @@ async def test_bm25_retrieval_async_with_filters(self, document_store): @pytest.mark.asyncio async def test_embedding_retrieval_async(self, document_store): + # init document store docs = [ Document(content="Most similar document", embedding=[1.0, 1.0, 1.0, 1.0]), Document(content="Less similar document", embedding=[0.5, 0.5, 0.5, 0.5]), ] await document_store.write_documents_async(docs) + # without num_candidates set to None results = await document_store._embedding_retrieval_async(query_embedding=[1.0, 1.0, 1.0, 1.0], top_k=1) assert len(results) == 1 assert results[0].content == "Most similar document" + # with num_candidates not None results = await document_store._embedding_retrieval_async( query_embedding=[1.0, 1.0, 1.0, 1.0], top_k=2, num_candidates=2 ) assert len(results) == 2 assert results[0].content == "Most similar document" + # with an embedding containing None with pytest.raises(ValueError, match="query_embedding must be a non-empty list of floats"): _ = await document_store._embedding_retrieval_async(query_embedding=None, top_k=2) From 7d17a711b94ccb25a9ec03ef759bcf915479e9a7 Mon Sep 17 00:00:00 2001 From: SyedShahmeerAli12 Date: Wed, 22 Apr 2026 15:59:26 +0500 Subject: [PATCH 09/12] test(elasticsearch): restore all removed comments and docstrings in async tests --- .../tests/test_document_store_async.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 7fdcdcd5a7..5fd6d9a7e7 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -79,7 +79,7 @@ async def test_write_documents_async(self, document_store): @pytest.mark.asyncio async def test_write_documents_async_invalid_document_type(self, document_store): """Test write_documents with invalid document type""" - invalid_docs = [{"id": "1", "content": "test"}] + invalid_docs = [{"id": "1", "content": "test"}] # Dictionary instead of Document object with pytest.raises(ValueError, match="param 'documents' must contain a list of objects of type Document"): await document_store.write_documents_async(invalid_docs) @@ -87,6 +87,7 @@ async def test_write_documents_async_invalid_document_type(self, document_store) async def test_write_documents_async_with_sparse_embedding_warning(self, document_store, caplog): """Test write_documents with document containing sparse_embedding field""" doc = Document(id="1", content="test", sparse_embedding=SparseEmbedding(indices=[0, 1], values=[0.5, 0.5])) + await document_store.write_documents_async([doc]) assert "but `sparse_vector_field` is not configured" in caplog.text @@ -298,22 +299,29 @@ async def test_query_sql_async(self, document_store: ElasticsearchDocumentStore) ] await document_store.write_documents_async(docs) + # SQL query returns raw JSON response from Elasticsearch SQL API sql_query = ( f'SELECT content, category, status, priority FROM "{document_store._index}" ' # noqa: S608 f"WHERE category = 'A' ORDER BY priority" ) result = await document_store._query_sql_async(sql_query) + # Verify raw JSON response structure assert isinstance(result, dict) assert "columns" in result assert "rows" in result + + # Verify we got 2 rows (documents with category A) assert len(result["rows"]) == 2 + + # Verify column structure column_names = [col["name"] for col in result["columns"]] assert "content" in column_names assert "category" in column_names @pytest.mark.asyncio async def test_query_sql_async_with_fetch_size(self, document_store: ElasticsearchDocumentStore): + """Test async SQL query with fetch_size parameter""" docs = [Document(content=f"Document {i}", meta={"category": "A", "index": i}) for i in range(15)] await document_store.write_documents_async(docs) @@ -321,14 +329,18 @@ async def test_query_sql_async_with_fetch_size(self, document_store: Elasticsear f'SELECT content, category FROM "{document_store._index}" ' # noqa: S608 f"WHERE category = 'A'" ) + + # Test with fetch_size result = await document_store._query_sql_async(sql_query, fetch_size=5) + # Should return raw JSON response assert isinstance(result, dict) assert "columns" in result assert "rows" in result @pytest.mark.asyncio async def test_query_sql_async_error_handling(self, document_store: ElasticsearchDocumentStore): + """Test error handling for invalid SQL queries""" invalid_query = "SELECT * FROM non_existent_index" with pytest.raises(DocumentStoreError, match="Failed to execute SQL query"): await document_store._query_sql_async(invalid_query) From 9737b869b195022f00e133ce04a6fedfaec9d268 Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 22 Apr 2026 14:52:36 +0200 Subject: [PATCH 10/12] fixin Mixin tests and write/delete docs asyn operations --- .../elasticsearch/document_store.py | 92 +++++++++++-------- .../tests/test_document_store_async.py | 11 ++- 2 files changed, 61 insertions(+), 42 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index ee0e077f95..86339d18b6 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -632,29 +632,33 @@ def write_documents( } ) - documents_written, errors = helpers.bulk( - client=self.client, - actions=elasticsearch_actions, - refresh=refresh, - index=self._index, - raise_on_error=False, - stats_only=False, - ) + try: + documents_written, errors = helpers.bulk( + client=self.client, + actions=elasticsearch_actions, + refresh=refresh, + index=self._index, + raise_on_error=False, + stats_only=False, + ) + except Exception as e: + msg = f"Failed to write documents to Elasticsearch: {e!s}" + raise DocumentStoreError(msg) from e if errors: # with stats_only=False, errors is guaranteed to be a list of dicts assert isinstance(errors, list) duplicate_errors_ids = [] other_errors = [] - for e in errors: - error_type = e["create"]["error"]["type"] + for error in errors: + error_type = error["create"]["error"]["type"] if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception": - duplicate_errors_ids.append(e["create"]["_id"]) + duplicate_errors_ids.append(error["create"]["_id"]) elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception": # when the policy is skip, duplication errors are OK and we should not raise an exception continue else: - other_errors.append(e) + other_errors.append(error) if len(duplicate_errors_ids) > 0: msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store." @@ -710,28 +714,32 @@ async def write_documents_async( } actions.append(action) - documents_written, errors = await helpers.async_bulk( - client=self.async_client, - actions=actions, - index=self._index, - refresh=refresh, - raise_on_error=False, - stats_only=False, - ) + try: + documents_written, errors = await helpers.async_bulk( + client=self.async_client, + actions=actions, + index=self._index, + refresh=refresh, + raise_on_error=False, + stats_only=False, + ) + except Exception as e: + msg = f"Failed to write documents to Elasticsearch: {e!s}" + raise DocumentStoreError(msg) from e if errors: # with stats_only=False, errors is guaranteed to be a list of dicts assert isinstance(errors, list) duplicate_errors_ids = [] other_errors = [] - for e in errors: - error_type = e["create"]["error"]["type"] + for error in errors: + error_type = error["create"]["error"]["type"] if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception": - duplicate_errors_ids.append(e["create"]["_id"]) + duplicate_errors_ids.append(error["create"]["_id"]) elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception": continue else: - other_errors.append(e) + other_errors.append(error) if len(duplicate_errors_ids) > 0: msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store." @@ -754,13 +762,17 @@ def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ - helpers.bulk( - client=self.client, - actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - refresh=refresh, - index=self._index, - raise_on_error=False, - ) + try: + helpers.bulk( + client=self.client, + actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), + refresh=refresh, + index=self._index, + raise_on_error=False, + ) + except Exception as e: + msg = f"Failed to delete documents from Elasticsearch: {e!s}" + raise DocumentStoreError(msg) from e def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]: return { @@ -785,13 +797,17 @@ async def delete_documents_async( """ self._ensure_initialized() - await helpers.async_bulk( - client=self.async_client, - actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - index=self._index, - refresh=refresh, - raise_on_error=False, - ) + try: + await helpers.async_bulk( + client=self.async_client, + actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), + index=self._index, + refresh=refresh, + raise_on_error=False, + ) + except Exception as e: + msg = f"Failed to delete documents from Elasticsearch: {e!s}" + raise DocumentStoreError(msg) from e def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None: """ diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 5fd6d9a7e7..5262304cd5 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -2,11 +2,13 @@ # # SPDX-License-Identifier: Apache-2.0 +import dataclasses + import pytest import pytest_asyncio from haystack.dataclasses.document import Document from haystack.dataclasses.sparse_embedding import SparseEmbedding -from haystack.document_stores.errors import DocumentStoreError +from haystack.document_stores.errors import DocumentStoreError, DuplicateDocumentError from haystack.document_stores.types import DuplicatePolicy from haystack.testing.document_store_async import ( CountDocumentsAsyncTest, @@ -57,8 +59,9 @@ async def document_store(self, request): await store.async_client.close() def assert_documents_are_equal(self, received: list[Document], expected: list[Document]): - assert len(received) == len(expected) - assert {doc.id for doc in received} == {doc.id for doc in expected} + # filter_documents_async() returns Documents with score populated; strip it before comparing + received = [dataclasses.replace(doc, score=None) for doc in received] + super().assert_documents_are_equal(received, expected) @pytest.mark.asyncio async def test_count_not_empty_async(self, document_store): @@ -73,7 +76,7 @@ async def test_write_documents_async(self, document_store): docs = [Document(id="1", content="test")] assert await document_store.write_documents_async(docs) == 1 assert await document_store.count_documents_async() == 1 - with pytest.raises(DocumentStoreError): + with pytest.raises(DuplicateDocumentError): await document_store.write_documents_async(docs, policy=DuplicatePolicy.FAIL) @pytest.mark.asyncio From 8ee9f3f11b1a41c1cf3d03d5ef13ae50eb1dacfa Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 22 Apr 2026 15:24:06 +0200 Subject: [PATCH 11/12] reverting try/catch on write/delete operations - aligning with other doc stores --- .../elasticsearch/document_store.py | 76 ++++++++----------- 1 file changed, 30 insertions(+), 46 deletions(-) diff --git a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py index 86339d18b6..5d86348154 100644 --- a/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py +++ b/integrations/elasticsearch/src/haystack_integrations/document_stores/elasticsearch/document_store.py @@ -632,18 +632,14 @@ def write_documents( } ) - try: - documents_written, errors = helpers.bulk( - client=self.client, - actions=elasticsearch_actions, - refresh=refresh, - index=self._index, - raise_on_error=False, - stats_only=False, - ) - except Exception as e: - msg = f"Failed to write documents to Elasticsearch: {e!s}" - raise DocumentStoreError(msg) from e + documents_written, errors = helpers.bulk( + client=self.client, + actions=elasticsearch_actions, + refresh=refresh, + index=self._index, + raise_on_error=False, + stats_only=False, + ) if errors: # with stats_only=False, errors is guaranteed to be a list of dicts @@ -714,18 +710,14 @@ async def write_documents_async( } actions.append(action) - try: - documents_written, errors = await helpers.async_bulk( - client=self.async_client, - actions=actions, - index=self._index, - refresh=refresh, - raise_on_error=False, - stats_only=False, - ) - except Exception as e: - msg = f"Failed to write documents to Elasticsearch: {e!s}" - raise DocumentStoreError(msg) from e + documents_written, errors = await helpers.async_bulk( + client=self.async_client, + actions=actions, + index=self._index, + refresh=refresh, + raise_on_error=False, + stats_only=False, + ) if errors: # with stats_only=False, errors is guaranteed to be a list of dicts @@ -762,17 +754,13 @@ def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", - `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency). For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter). """ - try: - helpers.bulk( - client=self.client, - actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - refresh=refresh, - index=self._index, - raise_on_error=False, - ) - except Exception as e: - msg = f"Failed to delete documents from Elasticsearch: {e!s}" - raise DocumentStoreError(msg) from e + helpers.bulk( + client=self.client, + actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), + refresh=refresh, + index=self._index, + raise_on_error=False, + ) def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]: return { @@ -797,17 +785,13 @@ async def delete_documents_async( """ self._ensure_initialized() - try: - await helpers.async_bulk( - client=self.async_client, - actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), - index=self._index, - refresh=refresh, - raise_on_error=False, - ) - except Exception as e: - msg = f"Failed to delete documents from Elasticsearch: {e!s}" - raise DocumentStoreError(msg) from e + await helpers.async_bulk( + client=self.async_client, + actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids), + index=self._index, + refresh=refresh, + raise_on_error=False, + ) def delete_all_documents(self, recreate_index: bool = False, refresh: bool = True) -> None: """ From 35fb0a8f5bafc13b1ccfa0adf1f72933d0c74fec Mon Sep 17 00:00:00 2001 From: "David S. Batista" Date: Wed, 22 Apr 2026 17:05:04 +0200 Subject: [PATCH 12/12] restoring some of the comments --- .../elasticsearch/tests/test_document_store_async.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integrations/elasticsearch/tests/test_document_store_async.py b/integrations/elasticsearch/tests/test_document_store_async.py index 5262304cd5..1087101e94 100644 --- a/integrations/elasticsearch/tests/test_document_store_async.py +++ b/integrations/elasticsearch/tests/test_document_store_async.py @@ -183,6 +183,7 @@ async def test_delete_all_documents_async_index_recreation(self, document_store) await document_store.delete_all_documents_async(recreate_index=True) assert await document_store.count_documents_async() == 0 + # verify index structure is preserved index_info_after = await document_store._async_client.indices.get(index=document_store._index) mappings_after = index_info_after[document_store._index]["mappings"] assert mappings_after == mappings_before, "delete_all_documents_async should preserve index mappings" @@ -194,10 +195,15 @@ async def test_delete_all_documents_async_index_recreation(self, document_store) settings_before["index"].pop("creation_date", None) assert settings_after == settings_before, "delete_all_documents_async should preserve index settings" + # verify index can accept new documents and retrieve new_doc = Document(id="4", content="New document after delete all") await document_store.write_documents_async([new_doc]) assert await document_store.count_documents_async() == 1 + results = await document_store.filter_documents_async() + assert len(results) == 1 + assert results[0].content == "New document after delete all" + @pytest.mark.asyncio async def test_delete_all_documents_async_no_index_recreation(self, document_store): docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]