Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c8070d4
feat: RDB persistence, virtual keys, module wiring, and tests
AviAvni Apr 14, 2026
0ea80d1
fix: pass references to delete_relationships for improved performance
AviAvni Apr 14, 2026
3e292ff
fix: handle uninitialized fjall keyspace to prevent unnecessary keysp…
AviAvni Apr 14, 2026
6160c2d
feat: implement bulk loading of attributes from fjall for RDB save op…
AviAvni Apr 15, 2026
59b5db4
feat: build RDB attribute snapshots before fork to avoid fjall in child
AviAvni Apr 15, 2026
5814f9f
fix: disable Redis auto-save in tests and fix cargo fmt
AviAvni Apr 15, 2026
3922528
perf: skip RDB snapshot building for graphs without fjall data
AviAvni Apr 15, 2026
45b8f1d
perf: avoid unnecessary cache inserts for entities without properties
AviAvni Apr 15, 2026
4351800
perf: use static EMPTY_ATTRS and fix benchmark Popen --save arg
AviAvni Apr 15, 2026
1ef41be
fix: use shutdown-on-sigterm save to fix UDF persistence without auto…
AviAvni Apr 15, 2026
08f5baf
Merge remote-tracking branch 'origin/main' into rdb-persistence-wiring
Copilot Apr 15, 2026
7489673
Merge remote-tracking branch 'origin/main' into rdb-persistence-wiring
Copilot Apr 16, 2026
3eddc7e
fix: pass &explicit_rels to delete_relationships (by-ref, not by-value)
Copilot Apr 16, 2026
1475c31
test: increase BGSAVE wait timeout from 10s to 60s in test_persistenc…
Copilot Apr 16, 2026
7148faa
test: reduce graph_count 1000→100 and drain BGSAVE before returning i…
Copilot Apr 16, 2026
acb1231
perf: replace per-edge GxB_Iterator in Tensor::encode with single me …
Copilot Apr 16, 2026
39bc72e
Revert "test: reduce graph_count 1000→100 and drain BGSAVE before ret…
AviAvni Apr 16, 2026
ff1f525
Revert "test: increase BGSAVE wait timeout from 10s to 60s in test_pe…
AviAvni Apr 16, 2026
f826116
chore: remove obsolete Redis configuration settings from redis.conf
AviAvni Apr 16, 2026
6326a1a
feat: add initial flow tests for various features and functionalities
AviAvni Apr 16, 2026
8bdf13b
Merge branch 'main' into rdb-persistence-wiring
AviAvni Apr 16, 2026
f836ca9
feat: Add bulk operations for UINT64 entries and tuple extraction in …
AviAvni Apr 16, 2026
04fabe2
feat: Optimize UINT64 matrix encoding and reduce redundant computatio…
AviAvni Apr 16, 2026
a97d2f1
Merge branch 'main' into rdb-persistence-wiring
Copilot Apr 16, 2026
f2408e6
fmt
AviAvni Apr 16, 2026
b1efe01
Merge branch 'main' into rdb-persistence-wiring
AviAvni Apr 16, 2026
43dc78d
refactor: Improve pattern matching and variable handling across multi…
AviAvni Apr 16, 2026
3a8e0da
feat: Integrate rustc-hash for improved performance in attribute stor…
AviAvni Apr 16, 2026
7ec1d7b
refactor: Remove test_persistency from completed tests and add it to …
AviAvni Apr 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flow_tests_done.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ tests/flow/test_concurrent_query.py
tests/flow/test_config.py
tests/flow/test_create_clause
tests/flow/test_distinct
tests/flow/test_effects.py
tests/flow/test_empty_query
tests/flow/test_encode_decode.py
tests/flow/test_entity_update
tests/flow/test_execution_plan_print.py
tests/flow/test_expand_into
Expand All @@ -24,6 +26,7 @@ tests/flow/test_function_calls
tests/flow/test_graph_create
tests/flow/test_graph_deletion
tests/flow/test_graph_merge
tests/flow/test_graph_versioning.py
tests/flow/test_hashjoin.py
tests/flow/test_imdb
tests/flow/test_index_create
Expand Down Expand Up @@ -52,8 +55,10 @@ tests/flow/test_path_algorithms.py
tests/flow/test_path_filter
tests/flow/test_path_projections.py
tests/flow/test_pending_queries_limit.py
tests/flow/test_persistency.py
tests/flow/test_point
tests/flow/test_query_validation
tests/flow/test_rdb_load.py
tests/flow/test_reduce.py
tests/flow/test_results.py
tests/flow/test_reversed_patterns
Expand Down
50 changes: 0 additions & 50 deletions flow_tests_todo.txt

This file was deleted.

4 changes: 2 additions & 2 deletions graph/src/graph/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,7 @@ impl Graph {

pub fn delete_relationships(
&mut self,
rels: HashMap<RelationshipId, (NodeId, NodeId)>,
rels: &HashMap<RelationshipId, (NodeId, NodeId)>,
) -> Result<(), String> {
self.deleted_relationships
.extend(rels.keys().map(|id| id.0));
Expand All @@ -1438,7 +1438,7 @@ impl Graph {
rels.values().map(|(src, dst)| (src.0, dst.0)).collect();

for (type_id, rels) in &rels
.into_iter()
.iter()
.map(|(id, (src, dst))| (id.0, src.0, dst.0))
.into_group_map_by(|(id, _, _)| self.get_relationship_type_id(RelationshipId(*id)))
{
Expand Down
4 changes: 2 additions & 2 deletions graph/src/runtime/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,8 @@ impl Pending {
}
if !self.deleted_relationships.is_empty() {
stats.borrow_mut().relationships_deleted += self.deleted_relationships.len();
let rels = std::mem::take(&mut self.deleted_relationships);
g.borrow_mut().delete_relationships(rels)?;
g.borrow_mut()
.delete_relationships(&self.deleted_relationships)?;
}
// Commit attribute changes and indexes after all deletions have been
// applied. This ensures relationship_attrs.remove() pending_deletes
Expand Down
41 changes: 41 additions & 0 deletions src/commands/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::redis_type::{create_virtual_keys, delete_stale_virtual_keys, finalize_pending_graphs};
use crate::serializers::DECODE_STATE;
use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue};

pub fn graph_debug(
ctx: &Context,
args: Vec<RedisString>,
) -> RedisResult {
if args.len() < 3 {
return Err(RedisError::WrongArity);
}
let mut args_iter = args.into_iter().skip(1);
let subcmd = args_iter.next_str()?;

match subcmd.to_uppercase().as_str() {
"AUX" => debug_aux(ctx, args_iter),
_ => Err(RedisError::String(format!(
"Unknown DEBUG subcommand: {subcmd}"
))),
}
}

fn debug_aux(
ctx: &Context,
mut args: impl Iterator<Item = RedisString>,
) -> RedisResult {
let action = args.next_str()?;
match action.to_uppercase().as_str() {
"START" => {
DECODE_STATE.lock().clear();
unsafe { create_virtual_keys(ctx.ctx) };
Ok(RedisValue::Integer(1))
}
"END" => {
finalize_pending_graphs();
unsafe { delete_stale_virtual_keys(ctx.ctx) };
Ok(RedisValue::Integer(0))
}
_ => Err(RedisError::String(format!("Unknown AUX action: {action}"))),
}
}
2 changes: 1 addition & 1 deletion src/commands/effect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ fn apply_effects(
let rel = RelationshipId::from(rel_id);
let mut rels = HashMap::new();
rels.insert(rel, (NodeId::from(src_id), NodeId::from(dst_id)));
g.delete_relationships(rels)?;
g.delete_relationships(&rels)?;
}

EFFECT_ADD_SCHEMA => {
Expand Down
2 changes: 2 additions & 0 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use redis_module::{RedisError, RedisResult};

pub mod config_cmd;
pub mod debug;
pub mod delete;
pub mod effect;
pub mod explain;
Expand All @@ -31,6 +32,7 @@ pub mod ro_query;
pub mod udf;

pub use config_cmd::graph_config;
pub use debug::graph_debug;
pub use delete::graph_delete;
pub use effect::graph_effect;
pub use explain::graph_explain;
Expand Down
50 changes: 49 additions & 1 deletion src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::{
redis_type::GRAPH_TYPE,
};
use parking_lot::RwLock;
use redis_module::{Context, NextArg, RedisResult, RedisString};
use redis_module::{Context, NextArg, RedisResult, RedisString, raw};
use std::ffi::CString;
use std::sync::Arc;
#[cfg(feature = "fuzz")]
use std::sync::atomic::{AtomicI32, Ordering};
Expand Down Expand Up @@ -52,11 +53,15 @@ pub fn graph_query(

let mut compact = false;
let mut track_memory = false;
let mut version_check: Option<u64> = None;
while let Ok(arg) = args.next_str() {
if arg == "--compact" {
compact = true;
} else if arg == "--track-memory" {
track_memory = true;
} else if arg == "version" {
let ver_str = args.next_str()?;
version_check = Some(ver_str.parse::<u64>()?);
}
}

Expand All @@ -66,6 +71,22 @@ pub fn graph_query(

if let Some(graph) = read_key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
let graph = graph.clone();

// Check version if provided
if let Some(provided_version) = version_check {
let current_schema_version = graph.read().graph.read().borrow().schema_version;
if provided_version != current_schema_version {
drop(read_key);
drop(graph);
// Return array with [error, version]
raw::reply_with_array(ctx.ctx, 2);
let err_msg = CString::new("ERR invalid graph version").unwrap();
raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
raw::reply_with_long_long(ctx.ctx, current_schema_version as i64);
return Ok(redis_module::RedisValue::NoReply);
}
}
Comment on lines 72 to +88
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Move the schema-version check into the serialized execution path.

These comparisons run before query_mut(...), so another writer can change the schema between the check and the actual query execution. That leaves a TOCTOU window where stale clients still get through, or valid clients are checked against an older snapshot. Validate version_check after the request has entered the same queue/lock that executes the query.

Based on learnings, Redis GRAPH.QUERY command must accept both read and write Cypher queries and block client during execution.

Also applies to: 98-112


drop(read_key);
return query_mut(ctx, &graph, query, compact, true, track_memory, key_name);
}
Expand All @@ -76,13 +97,40 @@ pub fn graph_query(
// Re-check: another client may have created it between our read and write open.
if let Some(graph) = key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
let graph = graph.clone();

// Check version if provided
if let Some(provided_version) = version_check {
let current_schema_version = graph.read().graph.read().borrow().schema_version;
if provided_version != current_schema_version {
// Return array with [error, version]
raw::reply_with_array(ctx.ctx, 2);
let err_msg = CString::new("ERR invalid graph version").unwrap();
raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
raw::reply_with_long_long(ctx.ctx, current_schema_version as i64);
return Ok(redis_module::RedisValue::NoReply);
}
}

return query_mut(ctx, &graph, query, compact, true, track_memory, key_name);
}

let graph = Arc::new(RwLock::new(ThreadedGraph::new(
*CONFIGURATION_CACHE_SIZE.lock(ctx) as usize,
&key_str.to_string(),
)));

// For a newly-created graph, the initial schema_version is 0
if let Some(provided_version) = version_check
&& provided_version != 0
{
// Return array with [error, version]
raw::reply_with_array(ctx.ctx, 2);
let err_msg = CString::new("ERR invalid graph version").unwrap();
raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
raw::reply_with_long_long(ctx.ctx, 0i64);
return Ok(redis_module::RedisValue::NoReply);
}

let result = query_mut(ctx, &graph, query, compact, true, track_memory, key_name);
key.set_value(&GRAPH_TYPE, graph)?;
result
Expand Down
11 changes: 7 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ mod graph_core;
mod module_init;
mod redis_type;
mod reply;
mod serializers;

use allocator::ThreadCountingAllocator;
use commands::{
graph_config, graph_delete, graph_explain, graph_list, graph_memory, graph_query, graph_record,
graph_ro_query, graph_udf,
graph_config, graph_debug, graph_delete, graph_effect, graph_explain, graph_list, graph_memory,
graph_query, graph_record, graph_ro_query, graph_udf,
};
use config::{
CONFIGURATION_CACHE_SIZE, CONFIGURATION_CMD_INFO, CONFIGURATION_DELAY_INDEXING,
Expand All @@ -56,13 +57,13 @@ use config::{
};
use module_init::graph_init;
use redis_module::{configuration::ConfigurationFlags, redis_module};
use redis_type::GRAPH_TYPE;
use redis_type::{GRAPH_TYPE, GRAPHMETA_TYPE};

redis_module! {
name: "graph",
version: 1,
allocator: (ThreadCountingAllocator, ThreadCountingAllocator),
data_types: [GRAPH_TYPE],
data_types: [GRAPH_TYPE, GRAPHMETA_TYPE],
init: graph_init,
commands: [
["graph.DELETE", graph_delete, "write deny-script", 1, 1, 1, ""],
Expand All @@ -74,6 +75,8 @@ redis_module! {
["graph.MEMORY", graph_memory, "readonly deny-script", 2, 2, 1, ""],
["graph.CONFIG", graph_config, "readonly deny-script allow-busy", 0, 0, 0, ""],
["graph.UDF", graph_udf, "write deny-script", 0, 0, 0, ""],
["graph.DEBUG", graph_debug, "write deny-script", 0, 0, 0, ""],
["graph.EFFECT", graph_effect, "write deny-script", 1, 1, 1, ""],
],
configurations: [
i64: [
Expand Down
16 changes: 16 additions & 0 deletions src/module_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::config::{
CONFIGURATION_JS_HEAP_SIZE, CONFIGURATION_JS_STACK_SIZE, CONFIGURATION_TEMP_FOLDER,
OMP_THREAD_COUNT, get_thread_count,
};
use crate::redis_type::on_persistence;
use graph::{
graph::graphblas::matrix::init,
index::redisearch::{REDISEARCH_INIT_LIBRARY, RediSearch_Init},
Expand All @@ -44,6 +45,10 @@ use std::{os::raw::c_int, os::raw::c_void, panic};
#[allow(non_upper_case_globals)]
static RedisModuleEvent_FlushDB: RedisModuleEvent = RedisModuleEvent { id: 2, dataver: 1 };

/// Redis event ID for Persistence events (RDB save start/end).
#[allow(non_upper_case_globals)]
static RedisModuleEvent_Persistence: RedisModuleEvent = RedisModuleEvent { id: 1, dataver: 1 };

pub fn graph_init(
ctx: &Context,
_: &Vec<redis_module::RedisString>,
Expand Down Expand Up @@ -72,6 +77,17 @@ pub fn graph_init(
Some(on_flush),
);
debug_assert_eq!(res, REDISMODULE_OK as c_int);

// Subscribe to persistence events for virtual key management.
let res = RedisModule_SubscribeToServerEvent.unwrap()(
ctx.ctx,
RedisModuleEvent_Persistence,
Some(on_persistence),
);
if res != REDISMODULE_OK as c_int {
eprintln!("FalkorDB: failed to subscribe to persistence events: code {res}");
return Status::Err;
}
}
match init_functions() {
Ok(()) => {}
Expand Down
Loading
Loading