diff --git a/graph/src/graph/attribute_cache.rs b/graph/src/graph/attribute_cache.rs index 5a35b20c..0b6a18d8 100644 --- a/graph/src/graph/attribute_cache.rs +++ b/graph/src/graph/attribute_cache.rs @@ -60,6 +60,8 @@ //! The default budget is 2 GiB per attribute store (nodes and relationships //! each get their own cache). +use std::sync::Arc; + use quick_cache::sync::Cache; use quick_cache::{DefaultHashBuilder, Lifecycle, Weighter}; @@ -69,7 +71,9 @@ use crate::runtime::value::Value; #[derive(Clone)] struct CachedEntity { /// Sorted by `attr_idx` for O(log n) binary-search lookups. - attrs: Vec<(u16, Value)>, + /// Wrapped in Arc so `quick_cache::get()` clone is O(1) refcount bump + /// instead of a full Vec heap allocation. + attrs: Arc>, /// Graph version when this entry was written/populated. version: u64, /// `true` when the entry has not yet been flushed to fjall. @@ -89,7 +93,12 @@ impl Weighter for EntityWeighter { let base = val.attrs.len() * (std::mem::size_of::() + std::mem::size_of::()); let heap: usize = val.attrs.iter().map(|(_, v)| v.heap_size()).sum(); // Minimum weight of 1 to satisfy quick_cache invariant. - (base + heap + std::mem::size_of::()).max(1) as u64 + // Include Arc overhead. + (base + + heap + + std::mem::size_of::() + + std::mem::size_of::>>()) + .max(1) as u64 } } @@ -182,12 +191,13 @@ impl AttributeCache { /// Return all cached attributes for an entity. /// /// Returns `None` on cache miss or version mismatch. + /// The returned `Arc` avoids cloning the underlying Vec. #[must_use] pub fn get_entity( &self, entity_id: u64, version: u64, - ) -> Option> { + ) -> Option>> { let entry = self.entries.get(&entity_id)?; if entry.version > version { return None; @@ -203,7 +213,7 @@ impl AttributeCache { &self, entity_id: u64, version: u64, - ) -> Option<(Vec<(u16, Value)>, bool)> { + ) -> Option<(Arc>, bool)> { let entry = self.entries.get(&entity_id)?; if entry.version > version { return None; @@ -240,7 +250,7 @@ impl AttributeCache { // Ensure attrs are sorted by attr_idx to support binary searches. attrs.sort_by_key(|item| item.0); let entry = CachedEntity { - attrs, + attrs: Arc::new(attrs), version, dirty, }; @@ -340,7 +350,9 @@ impl AttributeCache { if let Some(mut entry) = self.entries.get(&entity_id) && let Ok(pos) = entry.attrs.binary_search_by_key(&attr_idx, |(idx, _)| *idx) { - entry.attrs.remove(pos); + let mut new_attrs = (*entry.attrs).clone(); + new_attrs.remove(pos); + entry.attrs = Arc::new(new_attrs); entry.dirty = true; // Update the cache with the modified entry self.entries.insert(entity_id, entry); @@ -363,7 +375,7 @@ impl AttributeCache { pub fn collect_dirty_lru( &self, n: usize, - ) -> Vec<(u64, Vec<(u16, Value)>)> { + ) -> Vec<(u64, Arc>)> { let mut result = Vec::with_capacity(n); // Iterate and collect dirty entries. for (entity_id, entry) in self.entries.iter() { diff --git a/graph/src/graph/attribute_store.rs b/graph/src/graph/attribute_store.rs index db17a004..f4d55388 100644 --- a/graph/src/graph/attribute_store.rs +++ b/graph/src/graph/attribute_store.rs @@ -250,10 +250,10 @@ impl AttributeStore { fn populate_cache_from_fjall( &self, entity_id: u64, - ) -> Vec<(u16, Value)> { + ) -> Arc> { // If this entity is pending full deletion, return empty regardless of fjall state. if self.pending_deletes.contains(entity_id) { - return Vec::new(); + return Arc::new(Vec::new()); } let prefix = entity_id.to_be_bytes(); let attrs: Vec<(u16, Value)> = self @@ -271,7 +271,7 @@ impl AttributeStore { let _ = self .cache .insert_entity_if_older(entity_id, attrs.clone(), self.version); - attrs + Arc::new(attrs) } // ---- read path (cache → fjall) -------------------------------------- @@ -340,14 +340,18 @@ impl AttributeStore { // Try cache first. let cached = self.cache.get_entity(key, self.version); let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key)); - attrs.into_iter().filter_map(move |(idx, _)| { - let i = idx as usize; - if i < self.attrs_name.len() { - Some(self.attrs_name[i].clone()) - } else { - None - } - }) + attrs + .iter() + .filter_map(move |(idx, _)| { + let i = *idx as usize; + if i < self.attrs_name.len() { + Some(self.attrs_name[i].clone()) + } else { + None + } + }) + .collect::>() + .into_iter() } pub fn get_all_attrs( @@ -356,23 +360,27 @@ impl AttributeStore { ) -> impl Iterator, Value)> + '_ { let cached = self.cache.get_entity(key, self.version); let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key)); - attrs.into_iter().filter_map(move |(idx, value)| { - let i = idx as usize; - if i < self.attrs_name.len() { - Some((self.attrs_name[i].clone(), value)) - } else { - None - } - }) + attrs + .iter() + .filter_map(move |(idx, value)| { + let i = *idx as usize; + if i < self.attrs_name.len() { + Some((self.attrs_name[i].clone(), value.clone())) + } else { + None + } + }) + .collect::>() + .into_iter() } pub fn get_all_attrs_by_id( &self, key: u64, - ) -> impl Iterator + '_ { - let cached = self.cache.get_entity(key, self.version); - let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key)); - attrs.into_iter() + ) -> Arc> { + self.cache + .get_entity(key, self.version) + .unwrap_or_else(|| self.populate_cache_from_fjall(key)) } // ---- write path (cache only) ---------------------------------------- @@ -473,7 +481,7 @@ impl AttributeStore { } // Merge: start from current, apply overwrites, remove nulls. - let mut merged: Vec<(u16, Value)> = current; + let mut merged: Vec<(u16, Value)> = (*current).clone(); for (idx, value) in new_entries { match merged.binary_search_by_key(&idx, |(i, _)| *i) { Ok(pos) => merged[pos].1 = value, @@ -564,7 +572,7 @@ impl AttributeStore { } } // Then insert the current attribute set. - for &(attr_idx, ref value) in attrs { + for &(attr_idx, ref value) in attrs.iter() { let composite_key = make_key(*entity_id, attr_idx); batch.insert(self.keyspace(), composite_key, value.to_bytes()); } @@ -573,7 +581,7 @@ impl AttributeStore { // Re-insert entries to prevent data loss on commit failure. for (entity_id, attrs) in dirty_entries { self.cache - .insert_entity(entity_id, attrs, self.version, true); + .insert_entity(entity_id, (*attrs).clone(), self.version, true); } e.to_string() })?; @@ -604,7 +612,7 @@ impl AttributeStore { // Safe to flush: these are pre-existing dirty entries from prior // transactions, not from the active one. let mut batch = self.database.batch(); - for &(attr_idx, ref value) in &cached { + for &(attr_idx, ref value) in cached.iter() { let composite_key = make_key(entity_id, attr_idx); batch.insert(self.keyspace(), composite_key, value.to_bytes()); } diff --git a/graph/src/graph/graph.rs b/graph/src/graph/graph.rs index 2e37c0bf..a8adb1f9 100644 --- a/graph/src/graph/graph.rs +++ b/graph/src/graph/graph.rs @@ -1456,7 +1456,7 @@ impl Graph { pub fn get_node_all_attrs_by_id( &self, id: NodeId, - ) -> impl Iterator + '_ { + ) -> Arc> { self.node_attrs.get_all_attrs_by_id(id.0) } @@ -1478,7 +1478,7 @@ impl Graph { pub fn get_relationship_all_attrs_by_id( &self, id: RelationshipId, - ) -> impl Iterator + '_ { + ) -> Arc> { self.relationship_attrs.get_all_attrs_by_id(id.0) } @@ -1872,7 +1872,7 @@ impl Graph { entity_id: u64, ) -> usize { let mut sz: usize = 0; - for (_, val) in store.get_all_attrs_by_id(entity_id) { + for (_, val) in store.get_all_attrs_by_id(entity_id).iter() { sz += std::mem::size_of::() + std::mem::size_of::() + val.heap_size(); } sz diff --git a/graph/src/index/mod.rs b/graph/src/index/mod.rs index f3055afc..a81fbcef 100644 --- a/graph/src/index/mod.rs +++ b/graph/src/index/mod.rs @@ -82,7 +82,7 @@ unsafe fn rs_array_new(data: &[T]) -> *mut T { let n = data.len(); let elem_sz = std::mem::size_of::(); - let total = std::mem::size_of::() + n * elem_sz; + let total = std::mem::size_of::() + std::mem::size_of_val(data); unsafe { // Must use RedisModule_Alloc because RediSearch's array_free uses @@ -107,7 +107,7 @@ unsafe fn rs_array_new(data: &[T]) -> *mut T { std::ptr::copy_nonoverlapping( data.as_ptr().cast::(), arr_ptr.cast::(), - n * elem_sz, + std::mem::size_of_val(data), ); arr_ptr @@ -503,40 +503,40 @@ impl Document { _ => {} // Skip non-indexable types } } - if !numerics.is_empty() { - if let Some(name) = field.numeric_arr_name() { - let mut c_arr = rs_array_new(&numerics); - if !c_arr.is_null() { - RediSearch_DocumentAddFieldNumericArray( - self.rs_doc, - name.as_ptr(), - &raw mut c_arr, - RSFLDTYPE_NUMERIC, - ); - } + if !numerics.is_empty() + && let Some(name) = field.numeric_arr_name() + { + let mut c_arr = rs_array_new(&numerics); + if !c_arr.is_null() { + RediSearch_DocumentAddFieldNumericArray( + self.rs_doc, + name.as_ptr(), + &raw mut c_arr, + RSFLDTYPE_NUMERIC, + ); } } - if !string_cstrs.is_empty() { - if let Some(name) = field.string_arr_name() { - let ptrs: Vec<*mut c_char> = string_cstrs - .iter() - .map(|cs| cs.as_ptr() as *mut c_char) - .collect(); - let mut c_arr = rs_array_new(&ptrs); - if !c_arr.is_null() { - RediSearch_DocumentAddFieldStringArray( - self.rs_doc, - name.as_ptr(), - &raw mut c_arr, - ptrs.len(), - RSFLDTYPE_TAG, - ); - } - // Keep string content CStrings alive — the pointer - // array in RediSearch references them. They'll be - // properly freed when the Document is dropped. - self._string_arr_values.extend(string_cstrs); + if !string_cstrs.is_empty() + && let Some(name) = field.string_arr_name() + { + let ptrs: Vec<*mut c_char> = string_cstrs + .iter() + .map(|cs| cs.as_ptr().cast_mut()) + .collect(); + let mut c_arr = rs_array_new(&ptrs); + if !c_arr.is_null() { + RediSearch_DocumentAddFieldStringArray( + self.rs_doc, + name.as_ptr(), + &raw mut c_arr, + ptrs.len(), + RSFLDTYPE_TAG, + ); } + // Keep string content CStrings alive — the pointer + // array in RediSearch references them. They'll be + // properly freed when the Document is dropped. + self._string_arr_values.extend(string_cstrs); } } Value::VecF32(_) => {} // Only for vector fields @@ -773,7 +773,8 @@ impl Index { /// Uses the same bitmask as C FalkorDB's RediSearch INT64 workaround, /// applied to the value's magnitude so negative integers are handled /// correctly. - pub fn int_loses_f64_precision(i: i64) -> bool { + #[must_use] + pub const fn int_loses_f64_precision(i: i64) -> bool { i.unsigned_abs() & 0x7FF0_0000_0000_0000 != 0 } diff --git a/graph/src/planner/optimizer/utilize_index.rs b/graph/src/planner/optimizer/utilize_index.rs index 96f97f85..a4058d89 100644 --- a/graph/src/planner/optimizer/utilize_index.rs +++ b/graph/src/planner/optimizer/utilize_index.rs @@ -334,10 +334,10 @@ fn subtree_has_property_of( if let ExprIR::Property(_) = node.data() { // Check if the Variable child of this Property matches the alias for child in node.children() { - if let ExprIR::Variable(v) = child.data() { - if v == alias { - return true; - } + if let ExprIR::Variable(v) = child.data() + && v == alias + { + return true; } } } diff --git a/graph/src/runtime/ops/node_by_index_scan.rs b/graph/src/runtime/ops/node_by_index_scan.rs index b74b007b..2b82e2c6 100644 --- a/graph/src/runtime/ops/node_by_index_scan.rs +++ b/graph/src/runtime/ops/node_by_index_scan.rs @@ -221,7 +221,7 @@ impl<'a> NodeByIndexScanOp<'a> { fn can_utilize_index(q: &IndexQuery) -> bool { use crate::index::Index; - fn is_indexable(v: &Value) -> bool { + const fn is_indexable(v: &Value) -> bool { match v { Value::Int(i) => !Index::int_loses_f64_precision(*i), Value::Float(_) @@ -235,7 +235,7 @@ impl<'a> NodeByIndexScanOp<'a> { match q { IndexQuery::Equal { value, .. } => is_indexable(value), IndexQuery::Range { min, max, .. } => { - min.as_ref().map_or(true, is_indexable) && max.as_ref().map_or(true, is_indexable) + min.as_ref().is_none_or(is_indexable) && max.as_ref().map_or(true, is_indexable) } IndexQuery::And(children) | IndexQuery::Or(children) => { children.iter().all(Self::can_utilize_index) diff --git a/src/reply.rs b/src/reply.rs index cb788369..45a9e857 100644 --- a/src/reply.rs +++ b/src/reply.rs @@ -127,14 +127,13 @@ pub fn reply_compact_value( } raw::reply_with_array(ctx.ctx, i64::from(raw::REDISMODULE_POSTPONED_LEN)); - let attrs_len = bg - .get_node_all_attrs_by_id(*id) - .inspect(|(key, value)| { - raw::reply_with_array(ctx.ctx, 3); - raw::reply_with_long_long(ctx.ctx, (*key).into()); - reply_compact_value(ctx, runtime, value); - }) - .count(); + let attrs = bg.get_node_all_attrs_by_id(*id); + for (key, value) in attrs.iter() { + raw::reply_with_array(ctx.ctx, 3); + raw::reply_with_long_long(ctx.ctx, (*key).into()); + reply_compact_value(ctx, runtime, value); + } + let attrs_len = attrs.len(); drop(bg); unsafe { raw::RedisModule_ReplySetArrayLength.unwrap()(ctx.ctx, attrs_len as _); @@ -172,14 +171,13 @@ pub fn reply_compact_value( raw::reply_with_long_long(ctx.ctx, u64::from(*rel_dst) as _); let node_attr_offset = bg.node_attribute_count() as i64; raw::reply_with_array(ctx.ctx, i64::from(raw::REDISMODULE_POSTPONED_LEN)); - let attrs_len = bg - .get_relationship_all_attrs_by_id(*rel_id) - .inspect(|(key, value)| { - raw::reply_with_array(ctx.ctx, 3); - raw::reply_with_long_long(ctx.ctx, i64::from(*key) + node_attr_offset); - reply_compact_value(ctx, runtime, value); - }) - .count(); + let attrs = bg.get_relationship_all_attrs_by_id(*rel_id); + for (key, value) in attrs.iter() { + raw::reply_with_array(ctx.ctx, 3); + raw::reply_with_long_long(ctx.ctx, i64::from(*key) + node_attr_offset); + reply_compact_value(ctx, runtime, value); + } + let attrs_len = attrs.len(); drop(bg); unsafe { raw::RedisModule_ReplySetArrayLength.unwrap()(ctx.ctx, attrs_len as _);