Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c8070d4
feat: RDB persistence, virtual keys, module wiring, and tests
AviAvni Apr 14, 2026
0ea80d1
fix: pass references to delete_relationships for improved performance
AviAvni Apr 14, 2026
3e292ff
fix: handle uninitialized fjall keyspace to prevent unnecessary keysp…
AviAvni Apr 14, 2026
6160c2d
feat: implement bulk loading of attributes from fjall for RDB save op…
AviAvni Apr 15, 2026
59b5db4
feat: build RDB attribute snapshots before fork to avoid fjall in child
AviAvni Apr 15, 2026
5814f9f
fix: disable Redis auto-save in tests and fix cargo fmt
AviAvni Apr 15, 2026
3922528
perf: skip RDB snapshot building for graphs without fjall data
AviAvni Apr 15, 2026
45b8f1d
perf: avoid unnecessary cache inserts for entities without properties
AviAvni Apr 15, 2026
4351800
perf: use static EMPTY_ATTRS and fix benchmark Popen --save arg
AviAvni Apr 15, 2026
1ef41be
fix: use shutdown-on-sigterm save to fix UDF persistence without auto…
AviAvni Apr 15, 2026
08f5baf
Merge remote-tracking branch 'origin/main' into rdb-persistence-wiring
Copilot Apr 15, 2026
7489673
Merge remote-tracking branch 'origin/main' into rdb-persistence-wiring
Copilot Apr 16, 2026
3eddc7e
fix: pass &explicit_rels to delete_relationships (by-ref, not by-value)
Copilot Apr 16, 2026
1475c31
test: increase BGSAVE wait timeout from 10s to 60s in test_persistenc…
Copilot Apr 16, 2026
7148faa
test: reduce graph_count 1000→100 and drain BGSAVE before returning i…
Copilot Apr 16, 2026
acb1231
perf: replace per-edge GxB_Iterator in Tensor::encode with single me …
Copilot Apr 16, 2026
39bc72e
Revert "test: reduce graph_count 1000→100 and drain BGSAVE before ret…
AviAvni Apr 16, 2026
ff1f525
Revert "test: increase BGSAVE wait timeout from 10s to 60s in test_pe…
AviAvni Apr 16, 2026
f826116
chore: remove obsolete Redis configuration settings from redis.conf
AviAvni Apr 16, 2026
6326a1a
feat: add initial flow tests for various features and functionalities
AviAvni Apr 16, 2026
8bdf13b
Merge branch 'main' into rdb-persistence-wiring
AviAvni Apr 16, 2026
f836ca9
feat: Add bulk operations for UINT64 entries and tuple extraction in …
AviAvni Apr 16, 2026
04fabe2
feat: Optimize UINT64 matrix encoding and reduce redundant computatio…
AviAvni Apr 16, 2026
a97d2f1
Merge branch 'main' into rdb-persistence-wiring
Copilot Apr 16, 2026
f2408e6
fmt
AviAvni Apr 16, 2026
b1efe01
Merge branch 'main' into rdb-persistence-wiring
AviAvni Apr 16, 2026
43dc78d
refactor: Improve pattern matching and variable handling across multi…
AviAvni Apr 16, 2026
3a8e0da
feat: Integrate rustc-hash for improved performance in attribute stor…
AviAvni Apr 16, 2026
7ec1d7b
refactor: Remove test_persistency from completed tests and add it to …
AviAvni Apr 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lazy_static = "1.5.0"
parking_lot = "0.12.5"
redis-module = { git = "https://github.com/AviAvni/redismodule-rs", branch = "master" }
roaring = "0.11.3"
rustc-hash = "2"
ryu = "1.0.23"
thin-vec = "0.2.16"
orx-tree = "2.2.0"
Expand Down
4 changes: 4 additions & 0 deletions flow_tests_done.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ tests/flow/test_concurrent_query.py
tests/flow/test_config.py
tests/flow/test_create_clause
tests/flow/test_distinct
tests/flow/test_effects.py
tests/flow/test_empty_query
tests/flow/test_encode_decode.py
tests/flow/test_entity_update
tests/flow/test_execution_plan_print.py
tests/flow/test_expand_into
Expand All @@ -24,6 +26,7 @@ tests/flow/test_function_calls
tests/flow/test_graph_create
tests/flow/test_graph_deletion
tests/flow/test_graph_merge
tests/flow/test_graph_versioning.py
tests/flow/test_hashjoin.py
tests/flow/test_imdb
tests/flow/test_index_create
Expand Down Expand Up @@ -56,6 +59,7 @@ tests/flow/test_pending_queries_limit.py
tests/flow/test_point
tests/flow/test_profile.py
tests/flow/test_query_validation
tests/flow/test_rdb_load.py
tests/flow/test_reduce.py
tests/flow/test_results.py
tests/flow/test_reversed_patterns
Expand Down
7 changes: 1 addition & 6 deletions flow_tests_todo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@ tests/flow/test_stress.py

## Persistence & Replication
tests/flow/test_persistency.py
tests/flow/test_encode_decode.py
tests/flow/test_rdb_load.py
tests/flow/test_prev_rdb_decode.py
tests/flow/test_replication.py
tests/flow/test_replication_states.py
tests/flow/test_graph_versioning.py

## Redis Integration & Server Features
tests/flow/test_acl.py
Expand All @@ -42,5 +38,4 @@ tests/flow/test_undo_log.py

## Metadata & Internals
tests/flow/test_multi_label.py
tests/flow/test_effects.py
tests/flow/test_intern_string.py
tests/flow/test_intern_string.py
1 change: 1 addition & 0 deletions graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ regex = "1.12.3"
rquickjs = { version = "0.11", features = ["bindgen", "classes", "macro", "parallel"] }
ureq = { version = "3.3.0", default-features = false, features = ["rustls"] }
roaring = "0.11.3"
rustc-hash = "2"
thin-vec = "0.2.16"
179 changes: 115 additions & 64 deletions graph/src/graph/attribute_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@
//! Each attribute is stored as a separate fjall entry:
//! `entity_id (8 bytes big-endian) + attr_idx (2 bytes big-endian)`

use std::{collections::HashMap, process, sync::Arc};
use std::{process, sync::Arc};

use std::cmp::Ordering;
use std::collections::HashMap as StdHashMap;

use rustc_hash::FxHashMap;

use fjall::{
Database, Keyspace, KeyspaceCreateOptions, Readable, Snapshot, config::HashRatioPolicy,
Expand All @@ -98,6 +99,11 @@ use super::attribute_cache::AttributeCache;
use super::graphblas::serialization::{Decode, Encode, Reader, Writer};
use crate::runtime::{ordermap::OrderMap, orderset::OrderSet, value::Value};

/// Shared empty attribute vector to avoid per-entity allocations when an
/// entity has no properties.
static EMPTY_ATTRS: once_cell::sync::Lazy<Arc<Vec<(u16, Value)>>> =
once_cell::sync::Lazy::new(|| Arc::new(Vec::new()));

/// Create a composite key from entity ID and attribute index.
fn make_key(
entity_id: u64,
Expand Down Expand Up @@ -268,14 +274,13 @@ impl AttributeStore {
) -> Arc<Vec<(u16, Value)>> {
// If this entity is pending full deletion, return empty regardless of fjall state.
if self.pending_deletes.contains(entity_id) {
return Arc::new(Vec::new());
return EMPTY_ATTRS.clone();
}
// Fast path: if the fjall keyspace was never initialized, no data has
// ever been flushed from cache to fjall — all entity data is either in
// the cache (which already returned None to the caller) or doesn't
// exist. Skip the expensive keyspace creation + prefix scan.
// If the fjall keyspace was never initialized, no data was ever flushed
// to persistent storage. All live data is in the cache. Return empty
// without triggering expensive keyspace creation or cache writes.
if self.keyspace.get().is_none() {
return Arc::new(Vec::new());
return EMPTY_ATTRS.clone();
}
let prefix = entity_id.to_be_bytes();
let attrs: Vec<(u16, Value)> = self
Expand All @@ -296,6 +301,77 @@ impl AttributeStore {
Arc::new(attrs)
}

/// Returns `true` if this store has a fjall keyspace that might contain
/// cold data not present in cache. When `false`, all data is in cache
/// and the fork child can safely read from cache without touching fjall.
pub fn has_fjall_data(&self) -> bool {
self.keyspace.get().is_some()
}

/// Build a complete snapshot of all entity attributes by merging cache and
/// fjall data. Returns a map from entity-ID to its attribute list.
///
/// Called **before** Redis forks for BGSAVE so the fork child can encode
/// entities without touching fjall (which is unsafe after fork).
/// The returned map is passed to [`encode_with_range`] as the data source.
pub fn build_rdb_snapshot(
&self,
deleted: &RoaringTreemap,
max_id: u64,
) -> FxHashMap<u64, Arc<Vec<(u16, Value)>>> {
let mut snap: FxHashMap<u64, Arc<Vec<(u16, Value)>>> = FxHashMap::default();

// 1. Collect everything from fjall (cold store) in a single sequential scan.
if self.keyspace.get().is_some() {
let mut current_id: Option<u64> = None;
let mut current_attrs: Vec<(u16, Value)> = Vec::new();
for entry in self.snapshot().iter(self.keyspace()) {
let Ok((key, data)) = entry.into_inner() else {
continue;
};
let Some(attr_idx) = extract_attr_idx(&key) else {
continue;
};
let eid = u64::from_be_bytes(key[..8].try_into().unwrap());
let Some((value, _)) = Value::from_bytes(&data) else {
continue;
};
if current_id != Some(eid) {
if let Some(prev_id) = current_id {
if !deleted.contains(prev_id) && !self.pending_deletes.contains(prev_id) {
current_attrs.sort_by_key(|item| item.0);
snap.insert(prev_id, Arc::new(std::mem::take(&mut current_attrs)));
} else {
current_attrs.clear();
}
}
current_id = Some(eid);
}
current_attrs.push((attr_idx, value));
}
if let Some(prev_id) = current_id
&& !deleted.contains(prev_id)
&& !self.pending_deletes.contains(prev_id)
{
current_attrs.sort_by_key(|item| item.0);
snap.insert(prev_id, Arc::new(current_attrs));
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// 2. Overlay cache entries (hot store) — cache wins over fjall because
// it may contain newer dirty writes not yet flushed.
for id in 0..=max_id {
if deleted.contains(id) || self.pending_deletes.contains(id) {
continue;
}
if let Some(cached) = self.cache.get_entity(id, self.version) {
snap.insert(id, cached);
}
}

snap
}

// ---- read path (cache → fjall) --------------------------------------

pub fn remove(
Expand Down Expand Up @@ -470,24 +546,23 @@ impl AttributeStore {
/// the number of attributes *replaced* and the number of non-null attributes *set*.
pub fn insert_attrs(
&mut self,
attrs: &HashMap<u64, OrderMap<Arc<String>, Value>>,
attrs: &FxHashMap<u64, OrderMap<Arc<String>, Value>>,
) -> Result<(usize, usize), String> {
let mut nremoved = 0;
let mut nset = 0;

// Pre-resolve all unique attribute names → indices ONCE.
// Uses Arc pointer identity as key to avoid rehashing strings.
let mut name_to_idx: StdHashMap<*const String, u16> =
StdHashMap::with_capacity(attrs.values().next().map_or(0, |v| v.len()));
let mut name_to_idx: FxHashMap<*const String, u16> = FxHashMap::default();
for entity_attrs in attrs.values() {
for (attr, _) in entity_attrs.iter() {
let ptr = Arc::as_ptr(attr);
if !name_to_idx.contains_key(&ptr) {
if let std::collections::hash_map::Entry::Vacant(e) = name_to_idx.entry(ptr) {
let idx = self.attrs_name.get_index_of(attr).unwrap_or_else(|| {
self.attrs_name.insert(attr.clone());
self.attrs_name.len() - 1
}) as u16;
name_to_idx.insert(ptr, idx);
e.insert(idx);
}
}
}
Expand Down Expand Up @@ -528,7 +603,7 @@ impl AttributeStore {
merged.clear();
merged.reserve(current.len() + new_entries.len());
merged.extend_from_slice(&current);
merged.extend(new_entries.drain(..));
merged.append(&mut new_entries);
} else {
null_indices.sort_unstable();

Expand Down Expand Up @@ -600,20 +675,19 @@ impl AttributeStore {
/// Returns the number of non-null attributes imported.
pub fn import_attrs(
&mut self,
attrs: &HashMap<u64, OrderMap<Arc<String>, Value>>,
attrs: &FxHashMap<u64, OrderMap<Arc<String>, Value>>,
) -> usize {
// Pre-resolve all unique attribute names → indices ONCE.
let mut name_to_idx: StdHashMap<*const String, u16> =
StdHashMap::with_capacity(attrs.values().next().map_or(0, |v| v.len()));
let mut name_to_idx: FxHashMap<*const String, u16> = FxHashMap::default();
for entity_attrs in attrs.values() {
for (attr, _) in entity_attrs.iter() {
let ptr = Arc::as_ptr(attr);
if !name_to_idx.contains_key(&ptr) {
if let std::collections::hash_map::Entry::Vacant(e) = name_to_idx.entry(ptr) {
let idx = self.attrs_name.get_index_of(attr).unwrap_or_else(|| {
self.attrs_name.insert(attr.clone());
self.attrs_name.len() - 1
}) as u16;
name_to_idx.insert(ptr, idx);
e.insert(idx);
}
}
}
Expand Down Expand Up @@ -745,39 +819,6 @@ impl AttributeStore {
Ok(())
}

/// Flush an entity's pending dirty attributes to fjall, then invalidate from cache.
///
/// This ensures that any unflushed writes to the cache are persisted to fjall
/// before the cache entry is removed, preventing data loss when the entry is
/// about to be deleted from fjall.
///
/// However, if the entity was modified by the current transaction
/// (`dirty_entities`), the flush is skipped — those writes are uncommitted
/// and must not be persisted to fjall until `commit()`. This prevents
/// rollback from leaving current-tx inserts in the durable store.
fn flush_and_invalidate(
&self,
entity_id: u64,
) -> Result<(), String> {
if !self.dirty_entities.contains(entity_id)
&& let Some((cached, dirty)) = self.cache.get_entity_with_dirty(entity_id, self.version)
&& dirty
&& !cached.is_empty()
{
// Write dirty cached attributes to fjall before losing the cache entry.
// Safe to flush: these are pre-existing dirty entries from prior
// transactions, not from the active one.
let mut batch = get_database().batch();
for &(attr_idx, ref value) in cached.iter() {
let composite_key = make_key(entity_id, attr_idx);
batch.insert(self.keyspace(), composite_key, value.to_bytes());
}
batch.durability(None).commit().map_err(|e| e.to_string())?;
}
self.cache.invalidate(entity_id);
Ok(())
}

/// Access the shared cache (for background flush scheduling).
#[must_use]
pub const fn cache(&self) -> &Arc<AttributeCache> {
Expand Down Expand Up @@ -820,31 +861,33 @@ impl AttributeStore {
if current_id != Some(eid) {
// Flush previous entity.
if let Some(prev_id) = current_id {
if !self.pending_deletes.contains(prev_id) {
if self.pending_deletes.contains(prev_id) {
current_attrs.clear();
} else {
let _ = self.cache.insert_entity_if_older(
prev_id,
std::mem::take(&mut current_attrs),
self.version,
);
} else {
current_attrs.clear();
}
}
current_id = Some(eid);
}
current_attrs.push((idx, value));
}
// Flush the last entity.
if let Some(prev_id) = current_id {
if !self.pending_deletes.contains(prev_id) {
let _ = self
.cache
.insert_entity_if_older(prev_id, current_attrs, self.version);
}
if let Some(prev_id) = current_id
&& !self.pending_deletes.contains(prev_id)
{
let _ = self
.cache
.insert_entity_if_older(prev_id, current_attrs, self.version);
}
}

/// Encode a range of entities, borrowing the deleted bitmap directly.
/// Encode a range of entities, using the pre-built RDB snapshot when
/// provided, falling back to cache → fjall otherwise.
#[allow(clippy::too_many_arguments)]
pub fn encode_with_range(
&self,
w: &mut dyn Writer,
Expand All @@ -853,9 +896,10 @@ impl AttributeStore {
global_attrs: &[Arc<String>],
count: u64,
offset: u64,
rdb_snapshot: Option<&FxHashMap<u64, Arc<Vec<(u16, Value)>>>>,
) {
// Build attr remap inline.
let global_index: std::collections::HashMap<&Arc<String>, usize> = global_attrs
let global_index: FxHashMap<&Arc<String>, usize> = global_attrs
.iter()
.enumerate()
.map(|(i, n)| (n, i))
Expand Down Expand Up @@ -888,7 +932,14 @@ impl AttributeStore {

w.write_unsigned(id);

let props = self.get_all_attrs_by_id(id);
let props = rdb_snapshot.map_or_else(
|| self.get_all_attrs_by_id(id),
|snap| {
snap.get(&id)
.cloned()
.unwrap_or_else(|| EMPTY_ATTRS.clone())
},
);
w.write_unsigned(props.len() as u64);

for &(local_attr_id, ref value) in props.iter() {
Expand Down
Loading
Loading