Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
65c0416
Implement RDB serialization and deserialization for graph data
AviAvni Apr 12, 2026
20171d8
feat: add GRAPH.DEBUG command for RDB load management and testing
AviAvni Apr 12, 2026
3ce7c6d
refactor: improve concurrency and error handling in graph serializati…
AviAvni Apr 12, 2026
788c6f2
style: format code for improved readability in multiple files
AviAvni Apr 12, 2026
f1be7a0
feat: add falkordb-bulk-loader to Python dependencies and update enco…
AviAvni Apr 12, 2026
b560f8c
feat: add schema version management to Graph and MVCCGraph for versio…
AviAvni Apr 12, 2026
4a81b3a
feat: add test for graph versioning and remove from todo list
AviAvni Apr 12, 2026
179e0be
feat: add EFFECTS_THRESHOLD as a runtime-configurable value
AviAvni Apr 12, 2026
6c29a53
Implement effects replication for graph mutations
AviAvni Apr 13, 2026
89c6855
feat: move test_effects.py from todo to done list
AviAvni Apr 13, 2026
6fb14c4
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
f0a39b0
Implement reserved node and relationship counters, enhance index effe…
AviAvni Apr 13, 2026
844c53d
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
3f4c27c
fix: remove unnecessary whitespace in apply_effects function
AviAvni Apr 13, 2026
727eadb
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
47b6943
feat: add replication of changes after committing graph effects
AviAvni Apr 13, 2026
e87dc11
feat: include modified flag in query execution results for replicatio…
AviAvni Apr 13, 2026
ec86753
feat: ensure capacity for highest node and relationship IDs during ef…
AviAvni Apr 13, 2026
359c49c
feat: add UINT64 support for edge IDs in matrix and tensor implementa…
AviAvni Apr 13, 2026
791aca7
feat: add UINT64 support for matrices and tensors, update Redis key h…
AviAvni Apr 13, 2026
99fd3e1
refactor: reorganize import statements for clarity and consistency
AviAvni Apr 13, 2026
77315ca
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
d74a121
feat: implement dynamic resizing for graph node and relationship matr…
AviAvni Apr 13, 2026
a2b54e7
refactor: optimize effects handling in commit operation and add clear…
AviAvni Apr 13, 2026
890a23a
feat: add support for non-deterministic function detection in query p…
AviAvni Apr 13, 2026
d1a2377
feat: enhance attribute handling in Pending for new and existing node…
AviAvni Apr 13, 2026
774635c
feat: add disk space cleanup step in CI workflows for rust-pr and rus…
AviAvni Apr 13, 2026
fc3707e
refactor: replace unwrap() with direct lock() calls for DECODE_STATE …
AviAvni Apr 13, 2026
76ef617
feat: improve disk space cleanup in CI workflows for rust-pr and rust…
AviAvni Apr 13, 2026
3b7736d
refactor: simplify match statement in eval_row and improve function s…
AviAvni Apr 13, 2026
057c9c4
fix: dynamically set parallelism based on available CPU cores
AviAvni Apr 13, 2026
f33e2bb
feat: set parallelism for flow tests in CI workflows
AviAvni Apr 14, 2026
a3e65ee
fix: update flow test parallelism syntax in CI workflows
AviAvni Apr 14, 2026
a7a4bb6
refactor: update matrix resizing logic in rebuild_derived_matrices fo…
AviAvni Apr 14, 2026
8db5fed
feat: add redis configuration support to flow tests and update RLTest…
AviAvni Apr 14, 2026
103d9aa
refactor: remove redundant disk space cleanup steps from CI workflows
AviAvni Apr 14, 2026
c320f66
fix: update flow test execution to include release flag and remove pa…
AviAvni Apr 14, 2026
6c38250
refactor: simplify database handling in AttributeStore and Graph init…
AviAvni Apr 14, 2026
6722aff
refactor: streamline virtual key management by consolidating key dele…
AviAvni Apr 14, 2026
d4528ed
refactor: optimize virtual key management and encoding context handling
AviAvni Apr 14, 2026
a0b7ea3
refactor: optimize attribute cache and store to use Arc for improved …
AviAvni Apr 14, 2026
0cd9ddc
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
7a1be98
refactor: optimize memory calculations and improve index query handling
AviAvni Apr 14, 2026
39f2155
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
f8d7c07
merge: resolve conflicts after merging main into rdb
AviAvni Apr 14, 2026
056f466
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
7c14cc8
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions flow_tests_done.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ tests/flow/test_concurrent_query.py
tests/flow/test_config.py
tests/flow/test_create_clause
tests/flow/test_distinct
tests/flow/test_effects.py
tests/flow/test_empty_query
tests/flow/test_encode_decode.py
tests/flow/test_entity_update
tests/flow/test_execution_plan_print.py
tests/flow/test_expand_into
Expand All @@ -24,6 +26,7 @@ tests/flow/test_function_calls
tests/flow/test_graph_create
tests/flow/test_graph_deletion
tests/flow/test_graph_merge
tests/flow/test_graph_versioning.py
tests/flow/test_hashjoin.py
tests/flow/test_imdb
tests/flow/test_index_create
Expand Down Expand Up @@ -52,8 +55,10 @@ tests/flow/test_path_algorithms.py
tests/flow/test_path_filter
tests/flow/test_path_projections.py
tests/flow/test_pending_queries_limit.py
tests/flow/test_persistency.py
tests/flow/test_point
tests/flow/test_query_validation
tests/flow/test_rdb_load.py
tests/flow/test_reduce.py
tests/flow/test_results.py
tests/flow/test_reversed_patterns
Expand Down
7 changes: 0 additions & 7 deletions flow_tests_todo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ tests/flow/test_profile.py
tests/flow/test_stress.py

## Persistence & Replication
tests/flow/test_persistency.py
tests/flow/test_encode_decode.py
tests/flow/test_rdb_load.py
tests/flow/test_prev_rdb_decode.py
tests/flow/test_replication.py
tests/flow/test_replication_states.py
tests/flow/test_graph_versioning.py

## Redis Integration & Server Features
tests/flow/test_acl.py
tests/flow/test_bolt.py
tests/flow/test_bulk_insertion.py
tests/flow/test_graph_copy.py
Expand All @@ -46,5 +40,4 @@ tests/flow/test_undo_log.py

## Metadata & Internals
tests/flow/test_multi_label.py
tests/flow/test_effects.py
tests/flow/test_intern_string.py
41 changes: 41 additions & 0 deletions src/commands/debug.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::redis_type::{create_virtual_keys, delete_stale_virtual_keys, finalize_pending_graphs};
use crate::serializers::DECODE_STATE;
use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue};

pub fn graph_debug(
ctx: &Context,
args: Vec<RedisString>,
) -> RedisResult {
if args.len() < 3 {
return Err(RedisError::WrongArity);
}
let mut args_iter = args.into_iter().skip(1);
let subcmd = args_iter.next_str()?;

match subcmd.to_uppercase().as_str() {
"AUX" => debug_aux(ctx, args_iter),
_ => Err(RedisError::String(format!(
"Unknown DEBUG subcommand: {subcmd}"
))),
}
}

fn debug_aux(
ctx: &Context,
mut args: impl Iterator<Item = RedisString>,
) -> RedisResult {
let action = args.next_str()?;
match action.to_uppercase().as_str() {
"START" => {
DECODE_STATE.lock().clear();
unsafe { create_virtual_keys(ctx.ctx) };
Ok(RedisValue::Integer(1))
}
"END" => {
finalize_pending_graphs();
unsafe { delete_stale_virtual_keys(ctx.ctx) };
Ok(RedisValue::Integer(0))
}
_ => Err(RedisError::String(format!("Unknown AUX action: {action}"))),
}
}
2 changes: 2 additions & 0 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use redis_module::{RedisError, RedisResult};

pub mod config_cmd;
pub mod debug;
pub mod delete;
pub mod effect;
pub mod explain;
Expand All @@ -31,6 +32,7 @@ pub mod ro_query;
pub mod udf;

pub use config_cmd::graph_config;
pub use debug::graph_debug;
pub use delete::graph_delete;
pub use effect::graph_effect;
pub use explain::graph_explain;
Expand Down
50 changes: 49 additions & 1 deletion src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use crate::{
redis_type::GRAPH_TYPE,
};
use parking_lot::RwLock;
use redis_module::{Context, NextArg, RedisResult, RedisString};
use redis_module::{Context, NextArg, RedisResult, RedisString, raw};
use std::ffi::CString;
use std::sync::Arc;
#[cfg(feature = "fuzz")]
use std::sync::atomic::{AtomicI32, Ordering};
Expand Down Expand Up @@ -52,11 +53,15 @@ pub fn graph_query(

let mut compact = false;
let mut track_memory = false;
let mut version_check: Option<u64> = None;
while let Ok(arg) = args.next_str() {
if arg == "--compact" {
compact = true;
} else if arg == "--track-memory" {
track_memory = true;
} else if arg == "version" {
let ver_str = args.next_str()?;
version_check = Some(ver_str.parse::<u64>()?);
}
}

Expand All @@ -66,6 +71,22 @@ pub fn graph_query(

if let Some(graph) = read_key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
let graph = graph.clone();

// Check version if provided
if let Some(provided_version) = version_check {
let current_schema_version = graph.read().graph.read().borrow().schema_version;
if provided_version != current_schema_version {
drop(read_key);
drop(graph);
// Return array with [error, version]
raw::reply_with_array(ctx.ctx, 2);
let err_msg = CString::new("ERR invalid graph version").unwrap();
raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
raw::reply_with_long_long(ctx.ctx, current_schema_version as i64);
return Ok(redis_module::RedisValue::NoReply);
}
}

drop(read_key);
return query_mut(ctx, &graph, query, compact, true, track_memory, key_name);
}
Expand All @@ -76,13 +97,40 @@ pub fn graph_query(
// Re-check: another client may have created it between our read and write open.
if let Some(graph) = key.get_value::<Arc<RwLock<ThreadedGraph>>>(&GRAPH_TYPE)? {
let graph = graph.clone();

// Check version if provided
if let Some(provided_version) = version_check {
let current_schema_version = graph.read().graph.read().borrow().schema_version;
if provided_version != current_schema_version {
// Return array with [error, version]
raw::reply_with_array(ctx.ctx, 2);
let err_msg = CString::new("ERR invalid graph version").unwrap();
raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
raw::reply_with_long_long(ctx.ctx, current_schema_version as i64);
return Ok(redis_module::RedisValue::NoReply);
}
}

return query_mut(ctx, &graph, query, compact, true, track_memory, key_name);
}

let graph = Arc::new(RwLock::new(ThreadedGraph::new(
*CONFIGURATION_CACHE_SIZE.lock(ctx) as usize,
&key_str.to_string(),
)));

// For a newly-created graph, the initial schema_version is 0
if let Some(provided_version) = version_check
&& provided_version != 0
{
// Return array with [error, version]
raw::reply_with_array(ctx.ctx, 2);
let err_msg = CString::new("ERR invalid graph version").unwrap();
raw::reply_with_error(ctx.ctx, err_msg.as_ptr());
raw::reply_with_long_long(ctx.ctx, 0i64);
return Ok(redis_module::RedisValue::NoReply);
}

let result = query_mut(ctx, &graph, query, compact, true, track_memory, key_name);
key.set_value(&GRAPH_TYPE, graph)?;
result
Expand Down
11 changes: 7 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ mod graph_core;
mod module_init;
mod redis_type;
mod reply;
mod serializers;

use allocator::ThreadCountingAllocator;
use commands::{
graph_config, graph_delete, graph_explain, graph_list, graph_memory, graph_query, graph_record,
graph_ro_query, graph_udf,
graph_config, graph_debug, graph_delete, graph_effect, graph_explain, graph_list, graph_memory,
graph_query, graph_record, graph_ro_query, graph_udf,
};
use config::{
CONFIGURATION_CACHE_SIZE, CONFIGURATION_CMD_INFO, CONFIGURATION_DELAY_INDEXING,
Expand All @@ -56,13 +57,13 @@ use config::{
};
use module_init::graph_init;
use redis_module::{configuration::ConfigurationFlags, redis_module};
use redis_type::GRAPH_TYPE;
use redis_type::{GRAPH_TYPE, GRAPHMETA_TYPE};

redis_module! {
name: "graph",
version: 1,
allocator: (ThreadCountingAllocator, ThreadCountingAllocator),
data_types: [GRAPH_TYPE],
data_types: [GRAPH_TYPE, GRAPHMETA_TYPE],
init: graph_init,
commands: [
["graph.DELETE", graph_delete, "write deny-script", 1, 1, 1, ""],
Expand All @@ -74,6 +75,8 @@ redis_module! {
["graph.MEMORY", graph_memory, "readonly deny-script", 2, 2, 1, ""],
["graph.CONFIG", graph_config, "readonly deny-script allow-busy", 0, 0, 0, ""],
["graph.UDF", graph_udf, "write deny-script", 0, 0, 0, ""],
["graph.DEBUG", graph_debug, "write deny-script", 0, 0, 0, ""],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Gate graph.DEBUG behind a test/admin-only switch.

This command can clear global decode state, create/delete virtual keys, and finalize pending RDB loads. Registering it unconditionally makes those internal persistence controls callable by any client on a production server.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` at line 78, The registration of the sensitive command
["graph.DEBUG", graph_debug, ...] must be gated so it cannot be registered
unconditionally; update the code that pushes this tuple in src/lib.rs to only
register it when an admin/test-only switch is enabled (e.g., a compile feature
like cfg(feature = "admin_commands") or a runtime check such as
is_admin_commands_enabled()), referencing the graph_debug handler and the
registration array so you only push/insert that entry when the switch returns
true; ensure normal builds/clients never see the "graph.DEBUG" command unless
the admin/test flag is explicitly enabled.

["graph.EFFECT", graph_effect, "write deny-script", 1, 1, 1, ""],
],
configurations: [
i64: [
Expand Down
16 changes: 16 additions & 0 deletions src/module_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::config::{
CONFIGURATION_JS_HEAP_SIZE, CONFIGURATION_JS_STACK_SIZE, CONFIGURATION_TEMP_FOLDER,
OMP_THREAD_COUNT, get_thread_count,
};
use crate::redis_type::on_persistence;
use graph::{
graph::graphblas::matrix::init,
index::redisearch::{REDISEARCH_INIT_LIBRARY, RediSearch_Init},
Expand All @@ -44,6 +45,10 @@ use std::{os::raw::c_int, os::raw::c_void, panic};
#[allow(non_upper_case_globals)]
static RedisModuleEvent_FlushDB: RedisModuleEvent = RedisModuleEvent { id: 2, dataver: 1 };

/// Redis event ID for Persistence events (RDB save start/end).
#[allow(non_upper_case_globals)]
static RedisModuleEvent_Persistence: RedisModuleEvent = RedisModuleEvent { id: 1, dataver: 1 };

pub fn graph_init(
ctx: &Context,
_: &Vec<redis_module::RedisString>,
Expand Down Expand Up @@ -72,6 +77,17 @@ pub fn graph_init(
Some(on_flush),
);
debug_assert_eq!(res, REDISMODULE_OK as c_int);

// Subscribe to persistence events for virtual key management.
let res = RedisModule_SubscribeToServerEvent.unwrap()(
ctx.ctx,
RedisModuleEvent_Persistence,
Some(on_persistence),
);
if res != REDISMODULE_OK as c_int {
eprintln!("FalkorDB: failed to subscribe to persistence events: code {res}");
return Status::Err;
}
}
match init_functions() {
Ok(()) => {}
Expand Down
Loading
Loading