feat: add new falkordb integration#3158
feat: add new falkordb integration#3158ghassenzaara wants to merge 25 commits intodeepset-ai:mainfrom
Conversation
…e and native vector search
…ith neo4j-haystack
c580421 to
d380366
Compare
bogdankostic
left a comment
There was a problem hiding this comment.
Thanks so much for this PR, @ghassenzaara! Great work so far. You'll see quite a few comments below, but they are mostly just minor formatting improvements for the docstrings.
| - "Test / dspy" | ||
| - "Test / elasticsearch" | ||
| - "Test / faiss" | ||
| - "Test / falkor_db" |
There was a problem hiding this comment.
For consistency (for example with arcadedb) let's use falkordb throughout this integration instead of falkor_db, so changing for example also integrations/falkor_db -> integrations/falkordb.
| - any-glob-to-any-file: ".github/workflows/faiss.yml" | ||
|
|
||
|
|
||
| integration:falkor-db: |
There was a problem hiding this comment.
| integration:falkor-db: | |
| integration:falkordb: |
| documented_only: true | ||
| skip_empty_modules: true | ||
| renderer: | ||
| description: FalkorDB integration for Haystack — GraphRAG document store, embedding retriever, and Cypher retriever |
There was a problem hiding this comment.
| description: FalkorDB integration for Haystack — GraphRAG document store, embedding retriever, and Cypher retriever | |
| description: FalkorDB integration for Haystack |
| def to_dict(self) -> dict[str, Any]: | ||
| """ | ||
| Serialise this component to a dictionary. | ||
|
|
||
| :returns: Dictionary representation of this retriever's configuration. | ||
| """ | ||
| data = default_to_dict( | ||
| self, | ||
| custom_cypher_query=self._custom_cypher_query, | ||
| ) | ||
| data["init_parameters"]["document_store"] = self._document_store.to_dict() | ||
| return data | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, data: dict[str, Any]) -> "FalkorDBCypherRetriever": | ||
| """ | ||
| Deserialise this component from a dictionary. | ||
|
|
||
| :param data: Dictionary previously produced by :meth:`to_dict`. | ||
| :returns: A new :class:`FalkorDBCypherRetriever` instance. | ||
| """ | ||
| init_params = data.get("init_parameters", {}) | ||
| if "document_store" in init_params: | ||
| init_params["document_store"] = FalkorDBDocumentStore.from_dict(init_params["document_store"]) | ||
| return default_from_dict(cls, data) |
There was a problem hiding this comment.
These methods shouldn't be needed, see our documentation on default serialization behavior.
| """ | ||
| Retrieve documents by executing an OpenCypher query. | ||
|
|
||
| If a ``query`` is provided here, it overrides the ``custom_cypher_query`` |
There was a problem hiding this comment.
We use single backticks inside docstrings for inline code.
| If a ``query`` is provided here, it overrides the ``custom_cypher_query`` | |
| If a `query` is provided here, it overrides the `custom_cypher_query` |
| Translate a Haystack filter dict into an OpenCypher ``WHERE`` sub-expression. | ||
|
|
||
| Supports the full Haystack filter DSL: | ||
|
|
||
| - Logical: ``AND``, ``OR``, ``NOT`` | ||
| - Comparison: ``==``, ``!=``, ``>``, ``>=``, ``<``, ``<=`` | ||
| - Membership: ``in``, ``not in`` | ||
|
|
||
| All values are passed as named query parameters to prevent injection. | ||
|
|
||
| :param filters: A Haystack filter dictionary. | ||
| :returns: Tuple of ``(where_clause_string, params_dict)``. | ||
| :raises ValueError: If an unsupported operator or malformed filter is provided. |
There was a problem hiding this comment.
| Translate a Haystack filter dict into an OpenCypher ``WHERE`` sub-expression. | |
| Supports the full Haystack filter DSL: | |
| - Logical: ``AND``, ``OR``, ``NOT`` | |
| - Comparison: ``==``, ``!=``, ``>``, ``>=``, ``<``, ``<=`` | |
| - Membership: ``in``, ``not in`` | |
| All values are passed as named query parameters to prevent injection. | |
| :param filters: A Haystack filter dictionary. | |
| :returns: Tuple of ``(where_clause_string, params_dict)``. | |
| :raises ValueError: If an unsupported operator or malformed filter is provided. | |
| Translate a Haystack filter dict into an OpenCypher `WHERE` sub-expression. | |
| Supports the full Haystack filter DSL: | |
| - Logical: `AND`, `OR`, `NOT` | |
| - Comparison: `==`, `!=`, `>`, `>=`, `<`, `<=` | |
| - Membership: `in`, `not in` | |
| All values are passed as named query parameters to prevent injection. | |
| :param filters: A Haystack filter dictionary. | |
| :returns: Tuple of `(where_clause_string, params_dict)`. | |
| :raises ValueError: If an unsupported operator or malformed filter is provided. |
| build-backend = "hatchling.build" | ||
|
|
||
| [project] | ||
| name = "falkor-db-haystack" |
There was a problem hiding this comment.
| name = "falkor-db-haystack" | |
| name = "falkordb-haystack" |
There was a problem hiding this comment.
These changes should be reverted.
There was a problem hiding this comment.
Let's use our DocumentStoreBaseTests for testing the document store as described in our docs.
There was a problem hiding this comment.
Let's add a sentence here saying that in order to run the integration tests, a docker container needs to be run, similar to how we do for example for opensearch
|
I re-requested a review by accident. |
No worries, let me know if there's anything you're unsure about. |
… or falkor-db to falkordb for consistency, remove useless implementation, fix other small issues
…ssenzaara/haystack-core-integrations into feature/falkordb-integration
…ix and compatibility failures
|
Hi @bogdankostic, I've addressed all the feedback from the previous review. The changes should now align with Haystack's integration conventions and requirements. Please let me know if anything else needs to be adjusted. Happy to iterate further! |
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
|
I've created a follow up issue 3219 to add the extended operations to |
|
Hey @davidsbatista, |
bogdankostic
left a comment
There was a problem hiding this comment.
Thanks for addressing the comments @ghassenzaara! I think the PR is on a good way, we should just remove files that shouldn't be included in the PR and fix the sorting and scaling of scores.
Regarding #3219, I'd say let's wait for this PR to be merged so that we can be sure there won't be any major changes.
There was a problem hiding this comment.
This file is probably a residue from testing the integration and can be removed.
| def to_dict(self) -> dict[str, Any]: | ||
| """Serialize this retriever to a dictionary.""" | ||
| return default_to_dict( | ||
| self, | ||
| document_store=self.document_store.to_dict(), | ||
| custom_cypher_query=self.custom_cypher_query, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, data: dict[str, Any]) -> "FalkorDBCypherRetriever": | ||
| """Deserialize a retriever from a dictionary.""" | ||
| return default_from_dict(cls, data) |
There was a problem hiding this comment.
You can remove these methods entirely and directly use default_to_dict / default_from_dict in the serialization tests to test if serialization works as expected.
| def to_dict(self) -> dict[str, Any]: | ||
| """Serialize this retriever to a dictionary.""" | ||
| return default_to_dict( | ||
| self, | ||
| document_store=self.document_store.to_dict(), | ||
| filters=self.filters, | ||
| top_k=self.top_k, | ||
| filter_policy=self.filter_policy.value, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, data: dict[str, Any]) -> "FalkorDBEmbeddingRetriever": | ||
| """Deserialize a retriever from a dictionary.""" | ||
| return default_from_dict(cls, data) |
There was a problem hiding this comment.
You can remove these methods entirely and directly use default_to_dict / default_from_dict in the serialization tests to test if serialization works as expected.
|
|
||
| from haystack_integrations.document_stores.falkordb.document_store import ( | ||
| FalkorDBDocumentStore, | ||
| SimilarityFunction, |
There was a problem hiding this comment.
I don't think we need to expose SimilarityFunction here.
| def to_dict(self) -> dict[str, Any]: | ||
| """Serialize this document store to a dictionary.""" | ||
| return default_to_dict( | ||
| self, | ||
| host=self.host, | ||
| port=self.port, | ||
| graph_name=self.graph_name, | ||
| username=self.username, | ||
| password=self.password, | ||
| node_label=self.node_label, | ||
| embedding_dim=self.embedding_dim, | ||
| embedding_field=self.embedding_field, | ||
| similarity=self.similarity, | ||
| write_batch_size=self.write_batch_size, | ||
| recreate_graph=self.recreate_graph, | ||
| verify_connectivity=self.verify_connectivity, | ||
| ) | ||
|
|
||
| @classmethod | ||
| def from_dict(cls, data: dict[str, Any]) -> FalkorDBDocumentStore: | ||
| """Deserialize a document store from a dictionary.""" | ||
| return default_from_dict(cls, data) |
There was a problem hiding this comment.
You can remove these methods entirely and directly use default_to_dict / default_from_dict in the serialization tests to test if serialization works as expected.
| UNWIND $docs AS doc | ||
| MERGE (d:{self.node_label} {{id: doc.id}}) | ||
| ON CREATE SET d += doc | ||
| ON MATCH SET d += doc |
There was a problem hiding this comment.
Using += here keeps the properties from the overwritten document that are not present in the new document with the same ID, so we should use = here instead.
| ON MATCH SET d += doc | |
| ON MATCH SET d = doc |
| record = { | ||
| "id": doc.id, | ||
| "content": doc.content, | ||
| "embedding": doc.embedding, | ||
| } | ||
| if doc.meta: | ||
| record.update(doc.meta) |
There was a problem hiding this comment.
Meta fields can silently overwrite standard Document fields (id, content, embedding) due to the update order, let's make sure that these are not overwritten by meta fields.
| record = { | |
| "id": doc.id, | |
| "content": doc.content, | |
| "embedding": doc.embedding, | |
| } | |
| if doc.meta: | |
| record.update(doc.meta) | |
| record = {} | |
| if doc.meta: | |
| record.update(doc.meta) | |
| record["id"] = doc.id | |
| record["content"] = doc.content | |
| record["embedding"] = doc.embedding |
| YIELD node AS d, score | ||
| WHERE {where_clause} | ||
| RETURN d, score | ||
| ORDER BY score DESC |
There was a problem hiding this comment.
I just looked deeper into FalkorDB and it seems that the database is not returning similarity scores but embedding distances, so we should ORDER BY score ASC here - sorry for the wrong comment earlier.
| :returns: Scaled score in `[0, 1]`. | ||
| """ | ||
| if self.similarity == "cosine": | ||
| return (score + 1) / 2 |
There was a problem hiding this comment.
Thisformula assumes the raw score is cosine similarity in range [-1, 1], but the raw score is cosine distance (1 - cos_sim, range [0, 2]), we should therefore adapt the scaling:
| return (score + 1) / 2 | |
| return 1 - (score / 2) |
| """ | ||
| if self.similarity == "cosine": | ||
| return (score + 1) / 2 | ||
| return float(1 / (1 + math.exp(-score / 100))) |
There was a problem hiding this comment.
The raw score is euclidean distance in range [0, ∞). Plugging into this sigmoid:
- distance=0 (perfect match) → 0.5,
- distance→∞ (terrible) → ~1.0
Bad matches get higher scaled scores than good ones, the mapping is inverted.
Let's replace with a monotonically decreasing transform:
| return float(1 / (1 + math.exp(-score / 100))) | |
| return 1 / (1 + score) |
Related Issues
Proposed Changes:
FalkorDBDocumentStoreto connect Haystack with FalkorDB graph databases.FalkorDBEmbeddingRetrieverfor standard vector searches.FalkorDBCypherRetrieverfor running custom GraphRAG Cypher queries.vecf32()in Cypher queries.How did you test it?
hatch run test:unit).hatch run test:integration).hatch run test:types,hatch run fmt).Notes for the reviewer
vecf32()explicit cast in theUNWINDcypher queries. This is specifically required by FalkorDB to parse vector embeddings correctly.Checklist
fix:,feat:,build:,chore:,ci:,docs:,style:,refactor:,perf:,test:.