Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions graph/src/planner/optimizer/utilize_node_by_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ fn get_id_filter(
&& inner_func.name == "id"
&& let ExprIR::Variable(var) = filter.child(0).child(0).data()
&& var == node_alias
&& !references_var(&filter.child(1), node_alias)
{
Some((
Arc::new(filter.child(1).clone_as_tree()),
Expand All @@ -71,6 +72,7 @@ fn get_id_filter(
&& inner_func.name == "id"
&& let ExprIR::Variable(var) = filter.child(1).child(0).data()
&& var == node_alias
&& !references_var(&filter.child(0), node_alias)
{
let op = match filter.data() {
ExprIR::Eq => ExprIR::Eq,
Expand All @@ -86,6 +88,21 @@ fn get_id_filter(
}
}

/// Returns true if the expression tree references the given variable.
fn references_var(
expr: &DynNode<ExprIR<Variable>>,
var: &Variable,
) -> bool {
for node in expr.walk::<Bfs>() {
if let ExprIR::Variable(v) = node
&& v == var
{
return true;
}
}
false
}

/// Replaces label scan + ID filter with direct node ID lookup.
pub(super) fn utilize_node_by_id(optimized_plan: &mut DynTree<IR>) {
loop {
Expand Down
11 changes: 7 additions & 4 deletions graph/src/runtime/functions/procedures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,17 @@ pub fn register(funcs: &mut Functions) {
}| {
let mut map = OrderMap::default();
map.insert(Arc::new(String::from("label")), Value::String(label));
let mut sorted_keys: Vec<_> = fields.keys().cloned().collect();
sorted_keys.sort();
map.insert(
Arc::new(String::from("properties")),
Value::List(Arc::new(fields.keys().map(|f| Value::String(f.clone())).collect())),
Value::List(Arc::new(sorted_keys.iter().map(|f| Value::String(f.clone())).collect())),
);
let mut types_map = OrderMap::default();
for (attr, fields) in fields {
for attr in &sorted_keys {
let field_list = &fields[attr];
let mut types = thin_vec![];
for field in fields {
for field in field_list {
match field.ty {
IndexType::Range => {
types.push(Value::String(Arc::new(String::from("RANGE"))));
Expand All @@ -147,7 +150,7 @@ pub fn register(funcs: &mut Functions) {
}
}
}
types_map.insert(attr, Value::List(Arc::new(types)));
types_map.insert(attr.clone(), Value::List(Arc::new(types)));
}
map.insert(Arc::new(String::from("types")), Value::Map(Arc::new(types_map)));
map.insert(Arc::new(String::from("options")), Value::Null);
Expand Down
93 changes: 43 additions & 50 deletions graph/src/runtime/ops/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@
//! └────────────────┘
//! ```
//!
//! Large lists are expanded lazily: the operator stores a cursor into the
//! current list and only materializes `Env` rows in `BATCH_SIZE` chunks,
//! preventing memory blow-up for queries like `UNWIND range(1, 20000000)`.
//! Large lists are expanded lazily: the operator uses `ValueIter` (which can
//! be a lazy range iterator) and only materializes `Env` rows in `BATCH_SIZE`
//! chunks, preventing memory blow-up for queries like
//! `UNWIND range(1, 20000000)`.
//! Non-list values are treated as single-element results; NULL values
//! produce no output rows.

use std::collections::VecDeque;
use std::sync::Arc;
use thin_vec::ThinVec;

use crate::parser::ast::{QueryExpr, Variable};
use crate::planner::IR;
use crate::runtime::eval::ExprEval;
use crate::runtime::eval::{ExprEval, ValueIter};
use crate::runtime::{
batch::{BATCH_SIZE, Batch, BatchOp},
env::Env,
Expand All @@ -38,17 +37,15 @@ use crate::runtime::{
};
use orx_tree::{Dyn, NodeIdx, NodeRef};

/// State for lazily expanding a single list across multiple `next()` calls.
struct ListExpansion<'a> {
/// The list being expanded.
items: Arc<ThinVec<Value>>,
/// State for lazily expanding a value iterator across multiple `next()` calls.
struct IterExpansion<'a> {
/// The lazy iterator being expanded.
iter: ValueIter,
/// The base env for each output row (cloned per element).
base_env: Env<'a>,
/// Next index into `items` to emit.
cursor: usize,
}

impl<'a> ListExpansion<'a> {
impl<'a> IterExpansion<'a> {
/// Drain up to `budget` elements into `out`.
/// Returns `true` if the expansion is fully drained.
fn drain(
Expand All @@ -58,19 +55,22 @@ impl<'a> ListExpansion<'a> {
name: &Variable,
pool: &'a Pool<Value>,
) -> bool {
let end = (self.cursor + budget).min(self.items.len());
for i in self.cursor..end {
let mut row = self.base_env.clone_pooled(pool);
row.insert(name, self.items[i].clone());
out.push_back(row);
for _ in 0..budget {
match self.iter.next() {
Some(val) => {
let mut row = self.base_env.clone_pooled(pool);
row.insert(name, val);
out.push_back(row);
}
None => return true,
}
}
self.cursor = end;
self.cursor >= self.items.len()
false
}
}

/// Evaluate the list expression for a given row. Returns either:
/// - A `ListExpansion` if the result is a non-empty list
/// - An `IterExpansion` if the result is a non-empty list or lazy range
/// - A single `Env` pushed onto `pending` for scalar values
/// - Nothing for `Null`
fn eval_row<'a>(
Expand All @@ -79,28 +79,23 @@ fn eval_row<'a>(
name: &Variable,
env: &Env<'a>,
pending: &mut VecDeque<Env<'a>>,
) -> Result<Option<ListExpansion<'a>>, String> {
) -> Result<Option<IterExpansion<'a>>, String> {
let pool = runtime.env_pool;
let value = ExprEval::from_runtime(runtime).eval(list, list.root().idx(), Some(env), None)?;
let eval = ExprEval::from_runtime(runtime);
let iter = eval.eval_iter_expr(list, list.root().idx(), Some(env))?;

match value {
Value::Null => Ok(None),
Value::List(list) => {
if list.is_empty() {
return Ok(None);
}
Ok(Some(ListExpansion {
items: list,
base_env: env.clone_pooled(pool),
cursor: 0,
}))
}
other => {
match iter {
ValueIter::Empty | ValueIter::Once(None | Some(Value::Null)) => Ok(None),
ValueIter::Once(Some(val)) => {
let mut out_row = env.clone_pooled(pool);
out_row.insert(name, other);
out_row.insert(name, val);
pending.push_back(out_row);
Ok(None)
}
_ => Ok(Some(IterExpansion {
iter,
base_env: env.clone_pooled(pool),
})),
}
}

Expand All @@ -113,7 +108,7 @@ pub struct UnwindOp<'a> {
current_batch: Option<Batch<'a>>,
current_pos: usize,
/// Lazy expansion state for a large list.
list_expansion: Option<ListExpansion<'a>>,
iter_expansion: Option<IterExpansion<'a>>,
pub(crate) idx: NodeIdx<Dyn<IR>>,
}

Expand All @@ -133,7 +128,7 @@ impl<'a> UnwindOp<'a> {
pending: VecDeque::new(),
current_batch: None,
current_pos: 0,
list_expansion: None,
iter_expansion: None,
idx,
}
}
Expand All @@ -153,15 +148,15 @@ impl<'a> Iterator for UnwindOp<'a> {
break;
}

// Continue draining a partially-expanded list.
if let Some(ref mut exp) = self.list_expansion {
// Continue draining a partially-expanded iterator.
if let Some(ref mut exp) = self.iter_expansion {
let budget = BATCH_SIZE - envs.len();
let done = exp.drain(&mut self.pending, budget, self.name, self.runtime.env_pool);
if done {
self.list_expansion = None;
self.iter_expansion = None;
}
super::drain_pending(&mut self.pending, &mut envs);
if envs.len() >= BATCH_SIZE || self.list_expansion.is_some() {
if envs.len() >= BATCH_SIZE || self.iter_expansion.is_some() {
break;
}
continue;
Expand All @@ -186,11 +181,9 @@ impl<'a> Iterator for UnwindOp<'a> {
let row_idx = active[self.current_pos];
self.current_pos += 1;
let env = batch.env_ref(row_idx);
// eval_row borrows only runtime, list, name, env, and pending
// — not current_batch or list_expansion — so no borrow conflict.
match eval_row(self.runtime, self.list, self.name, env, &mut self.pending) {
Ok(Some(expansion)) => {
self.list_expansion = Some(expansion);
self.iter_expansion = Some(expansion);
break; // drain the expansion in the next loop iteration
}
Ok(None) => {}
Expand All @@ -203,19 +196,19 @@ impl<'a> Iterator for UnwindOp<'a> {
}
}

// Drain list expansion outside the batch borrow scope.
if let Some(ref mut exp) = self.list_expansion {
// Drain iterator expansion outside the batch borrow scope.
if let Some(ref mut exp) = self.iter_expansion {
let budget = BATCH_SIZE.saturating_sub(self.pending.len());
let done = exp.drain(&mut self.pending, budget, self.name, self.runtime.env_pool);
if done {
self.list_expansion = None;
self.iter_expansion = None;
}
}

super::drain_pending(&mut self.pending, &mut envs);

// Check if batch is exhausted.
if self.list_expansion.is_none()
if self.iter_expansion.is_none()
&& let Some(ref batch) = self.current_batch
&& self.current_pos >= batch.active_len()
{
Expand Down
9 changes: 5 additions & 4 deletions tests/flow/graph_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ def graph_eq(A, B):
ORDER BY label, properties, types, language, stopwords, entitytype"""),

# constraints
('constraints', """CALL db.constraints()
YIELD type, label, properties, entitytype, status
RETURN type, label, properties, entitytype, status
ORDER BY type, label, properties, entitytype, status""")
# TODO: enable once constraints are supported
# ('constraints', """CALL db.constraints()
# YIELD type, label, properties, entitytype, status
# RETURN type, label, properties, entitytype, status
# ORDER BY type, label, properties, entitytype, status""")
]

for category, q in queries:
Expand Down
Loading