Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/elasticsearch/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = [
"haystack-ai>=2.26.1",
"haystack-ai>=2.28.0",
"elasticsearch>=8,<9",
"aiohttp>=3.9.0" # for async support https://elasticsearch-py.readthedocs.io/en/latest/async.html#valueerror-when-initializing-asyncelasticsearch
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,29 +632,33 @@ def write_documents(
}
)

documents_written, errors = helpers.bulk(
client=self.client,
actions=elasticsearch_actions,
refresh=refresh,
index=self._index,
raise_on_error=False,
stats_only=False,
)
try:
documents_written, errors = helpers.bulk(
client=self.client,
actions=elasticsearch_actions,
refresh=refresh,
index=self._index,
raise_on_error=False,
stats_only=False,
)
except Exception as e:
msg = f"Failed to write documents to Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

if errors:
# with stats_only=False, errors is guaranteed to be a list of dicts
assert isinstance(errors, list)
duplicate_errors_ids = []
other_errors = []
for e in errors:
error_type = e["create"]["error"]["type"]
for error in errors:
error_type = error["create"]["error"]["type"]
if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception":
duplicate_errors_ids.append(e["create"]["_id"])
duplicate_errors_ids.append(error["create"]["_id"])
elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception":
# when the policy is skip, duplication errors are OK and we should not raise an exception
continue
else:
other_errors.append(e)
other_errors.append(error)

if len(duplicate_errors_ids) > 0:
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
Expand Down Expand Up @@ -704,36 +708,49 @@ async def write_documents_async(
self._handle_sparse_embedding(doc_dict, doc.id)

action = {
"_op_type": "create" if policy == DuplicatePolicy.FAIL else "index",
"_op_type": "index" if policy == DuplicatePolicy.OVERWRITE else "create",
"_id": doc.id,
"_source": doc_dict,
}
actions.append(action)

try:
success, failed = await helpers.async_bulk(
Comment thread
anakin87 marked this conversation as resolved.
documents_written, errors = await helpers.async_bulk(
client=self.async_client,
actions=actions,
index=self._index,
refresh=refresh,
raise_on_error=False,
stats_only=False,
)
if failed:
# with stats_only=False, failed is guaranteed to be a list of dicts
assert isinstance(failed, list)
if policy == DuplicatePolicy.FAIL:
for error in failed:
if "create" in error and error["create"]["status"] == DOC_ALREADY_EXISTS:
msg = f"ID '{error['create']['_id']}' already exists in the document store"
raise DuplicateDocumentError(msg)
msg = f"Failed to write documents to Elasticsearch. Errors:\n{failed}"
raise DocumentStoreError(msg)
return success
except Exception as e:
msg = f"Failed to write documents to Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

if errors:
# with stats_only=False, errors is guaranteed to be a list of dicts
assert isinstance(errors, list)
duplicate_errors_ids = []
other_errors = []
for error in errors:
error_type = error["create"]["error"]["type"]
if policy == DuplicatePolicy.FAIL and error_type == "version_conflict_engine_exception":
duplicate_errors_ids.append(error["create"]["_id"])
elif policy == DuplicatePolicy.SKIP and error_type == "version_conflict_engine_exception":
continue
else:
other_errors.append(error)

if len(duplicate_errors_ids) > 0:
msg = f"IDs '{', '.join(duplicate_errors_ids)}' already exist in the document store."
raise DuplicateDocumentError(msg)

if len(other_errors) > 0:
msg = f"Failed to write documents to Elasticsearch. Errors:\n{other_errors}"
raise DocumentStoreError(msg)

return documents_written

def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for", True, False] = "wait_for") -> None:
"""
Deletes all documents with a matching document_ids from the document store.
Expand All @@ -745,13 +762,17 @@ def delete_documents(self, document_ids: list[str], refresh: Literal["wait_for",
- `"wait_for"`: Wait for the next refresh cycle (default, ensures read-your-writes consistency).
For more details, see the [Elasticsearch refresh documentation](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/refresh-parameter).
"""
helpers.bulk(
client=self.client,
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
refresh=refresh,
index=self._index,
raise_on_error=False,
)
try:
helpers.bulk(
client=self.client,
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
refresh=refresh,
index=self._index,
raise_on_error=False,
)
except Exception as e:
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
raise DocumentStoreError(msg) from e

def _prepare_delete_all_request(self, *, is_async: bool, refresh: bool) -> dict[str, Any]:
return {
Expand Down Expand Up @@ -782,6 +803,7 @@ async def delete_documents_async(
actions=({"_op_type": "delete", "_id": id_} for id_ in document_ids),
index=self._index,
refresh=refresh,
raise_on_error=False,
)
except Exception as e:
msg = f"Failed to delete documents from Elasticsearch: {e!s}"
Expand Down
Loading
Loading