diff --git a/packages/treecrdt-core/benches/core.rs b/packages/treecrdt-core/benches/core.rs index 9198a6d7..2500288c 100644 --- a/packages/treecrdt-core/benches/core.rs +++ b/packages/treecrdt-core/benches/core.rs @@ -3,7 +3,9 @@ use std::fs; use std::path::PathBuf; use std::time::Instant; -use treecrdt_core::{Lamport, LamportClock, MemoryStorage, NodeId, ReplicaId, TreeCrdt}; +use treecrdt_core::{ + Lamport, LamportClock, LocalPlacement, MemoryStorage, NodeId, ReplicaId, TreeCrdt, +}; const BENCH_CONFIG: &[(u64, u64)] = &[(100, 10), (1_000, 10), (10_000, 10)]; @@ -53,15 +55,15 @@ fn run_benchmark(replica: &ReplicaId, count: u64) -> f64 { let mut tree = TreeCrdt::new(replica.clone(), storage, LamportClock::default()).unwrap(); let start = Instant::now(); - let mut last: Option = None; + let mut last_placement = LocalPlacement::First; for i in 0..count { let node = hex_id(i + 1); - let _ = tree.local_insert_after(NodeId::ROOT, node, last).unwrap(); - last = Some(node); + let _ = tree.local_insert(NodeId::ROOT, node, last_placement, None).unwrap(); + last_placement = LocalPlacement::After(node); } for i in 0..count { let node = hex_id(i + 1); - let _ = tree.local_move_after(node, NodeId::ROOT, None).unwrap(); + let _ = tree.local_move(node, NodeId::ROOT, LocalPlacement::First).unwrap(); } let _ = tree.operations_since(0 as Lamport).unwrap(); start.elapsed().as_secs_f64() * 1000.0 diff --git a/packages/treecrdt-core/src/affected.rs b/packages/treecrdt-core/src/affected.rs new file mode 100644 index 00000000..ab37aba1 --- /dev/null +++ b/packages/treecrdt-core/src/affected.rs @@ -0,0 +1,68 @@ +use crate::ids::NodeId; +use crate::ops::OperationKind; + +pub(crate) fn affected_parents( + snapshot_parent: Option, + kind: &OperationKind, +) -> Vec { + let mut parents = Vec::new(); + if let Some(p) = snapshot_parent { + parents.push(p); + } + match kind { + OperationKind::Insert { parent, .. } => parents.push(*parent), + OperationKind::Move { new_parent, .. } => parents.push(*new_parent), + OperationKind::Delete { .. } + | OperationKind::Tombstone { .. } + | OperationKind::Payload { .. } => {} + } + parents.sort(); + parents.dedup(); + parents +} + +pub(crate) fn sorted_node_ids(nodes: impl IntoIterator) -> Vec { + let mut ids: Vec = nodes.into_iter().collect(); + ids.sort(); + ids.dedup(); + ids +} + +pub(crate) fn parent_hints_from(parent: Option) -> Vec { + parent.into_iter().collect() +} + +fn push_if_live(nodes: &mut Vec, id: NodeId) { + if id != NodeId::TRASH { + nodes.push(id); + } +} + +fn push_snapshot_parent(nodes: &mut Vec, snapshot_parent: Option) { + if let Some(p) = snapshot_parent { + push_if_live(nodes, p); + } +} + +pub(crate) fn direct_affected_nodes( + snapshot_parent: Option, + kind: &OperationKind, +) -> Vec { + let mut nodes = Vec::new(); + push_if_live(&mut nodes, kind.node()); + match kind { + OperationKind::Insert { parent, .. } => { + push_snapshot_parent(&mut nodes, snapshot_parent); + push_if_live(&mut nodes, *parent); + } + OperationKind::Move { new_parent, .. } => { + push_snapshot_parent(&mut nodes, snapshot_parent); + push_if_live(&mut nodes, *new_parent); + } + OperationKind::Delete { .. } | OperationKind::Tombstone { .. } => { + push_snapshot_parent(&mut nodes, snapshot_parent); + } + OperationKind::Payload { .. } => {} + } + sorted_node_ids(nodes) +} diff --git a/packages/treecrdt-core/src/lib.rs b/packages/treecrdt-core/src/lib.rs index 8fa89423..c23dde2d 100644 --- a/packages/treecrdt-core/src/lib.rs +++ b/packages/treecrdt-core/src/lib.rs @@ -3,6 +3,7 @@ //! This crate stays independent of concrete storage engines so it can be embedded in SQLite, //! WASM, or any host that can satisfy the traits defined here. +pub(crate) mod affected; pub mod error; pub mod ids; pub mod materialization; @@ -10,6 +11,8 @@ pub mod ops; pub mod order_key; pub mod traits; pub mod tree; +pub mod types; +mod validation; pub mod version_vector; pub use error::{Error, Result}; @@ -30,7 +33,6 @@ pub use traits::{ MemoryPayloadStore, MemoryStorage, NodeStore, NoopParentOpIndex, NoopStorage, ParentOpIndex, PayloadStore, Storage, TruncatingParentOpIndex, }; -pub use tree::{ - ApplyDelta, LocalFinalizePlan, LocalPlacement, NodeExport, NodeSnapshotExport, TreeCrdt, -}; +pub use tree::TreeCrdt; +pub use types::{ApplyDelta, LocalFinalizePlan, LocalPlacement, NodeExport, NodeSnapshotExport}; pub use version_vector::VersionVector; diff --git a/packages/treecrdt-core/src/tree.rs b/packages/treecrdt-core/src/tree.rs index b2912e53..b3879d75 100644 --- a/packages/treecrdt-core/src/tree.rs +++ b/packages/treecrdt-core/src/tree.rs @@ -1,11 +1,15 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; +use crate::affected::{ + affected_parents, direct_affected_nodes, parent_hints_from, sorted_node_ids, +}; use crate::error::{Error, Result}; use crate::ids::{Lamport, NodeId, OperationId, ReplicaId}; use crate::ops::{cmp_op_key, Operation, OperationKind}; use crate::traits::{ Clock, MemoryNodeStore, MemoryPayloadStore, NodeStore, ParentOpIndex, PayloadStore, Storage, }; +use crate::types::{ApplyDelta, LocalFinalizePlan, LocalPlacement, NodeExport, NodeSnapshotExport}; use crate::version_vector::VersionVector; #[derive(Clone)] @@ -33,165 +37,19 @@ where op_count: u64, } -#[derive(Clone, Debug)] -pub struct NodeExport { - pub node: NodeId, - pub parent: Option, - pub children: Vec, - pub last_change: VersionVector, - pub deleted_at: Option, -} - -#[derive(Clone, Debug)] -pub struct NodeSnapshotExport { - pub parent: Option, - pub order_key: Option>, -} - -#[derive(Clone, Debug)] -pub struct ApplyDelta { - pub snapshot: NodeSnapshotExport, - pub affected_nodes: Vec, -} - -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum LocalPlacement { - First, - Last, - After(NodeId), -} - -impl LocalPlacement { - pub fn from_parts(placement: &str, after: Option) -> Result { - match placement { - "first" => Ok(Self::First), - "last" => Ok(Self::Last), - "after" => { - let Some(after_id) = after else { - return Err(Error::InvalidOperation( - "missing after for placement=after".into(), - )); - }; - Ok(Self::After(after_id)) - } - _ => Err(Error::InvalidOperation("invalid placement".into())), - } - } -} - -#[derive(Clone, Debug, Default)] -pub struct LocalFinalizePlan { - pub parent_hints: Vec, - pub extra_index_records: Vec<(NodeId, OperationId)>, -} - -fn affected_parents(snapshot_parent: Option, kind: &OperationKind) -> Vec { - let mut parents = Vec::new(); - if let Some(p) = snapshot_parent { - parents.push(p); - } - match kind { - OperationKind::Insert { parent, .. } => parents.push(*parent), - OperationKind::Move { new_parent, .. } => parents.push(*new_parent), - OperationKind::Delete { .. } - | OperationKind::Tombstone { .. } - | OperationKind::Payload { .. } => {} - } - parents.sort(); - parents.dedup(); - parents -} - -fn sorted_node_ids(nodes: impl IntoIterator) -> Vec { - let mut ids: Vec = nodes.into_iter().collect(); - ids.sort(); - ids.dedup(); - ids -} - -fn direct_affected_nodes(snapshot_parent: Option, kind: &OperationKind) -> Vec { - let mut nodes = Vec::new(); - let node = kind.node(); - if node != NodeId::TRASH { - nodes.push(node); - } - match kind { - OperationKind::Insert { parent, .. } => { - if let Some(old_parent) = snapshot_parent { - if old_parent != NodeId::TRASH { - nodes.push(old_parent); - } - } - if *parent != NodeId::TRASH { - nodes.push(*parent); - } - } - OperationKind::Move { new_parent, .. } => { - if let Some(old_parent) = snapshot_parent { - if old_parent != NodeId::TRASH { - nodes.push(old_parent); - } - } - if *new_parent != NodeId::TRASH { - nodes.push(*new_parent); - } - } - OperationKind::Delete { .. } | OperationKind::Tombstone { .. } => { - if let Some(parent) = snapshot_parent { - if parent != NodeId::TRASH { - nodes.push(parent); - } - } - } - OperationKind::Payload { .. } => {} - } - sorted_node_ids(nodes) -} - impl TreeCrdt where S: Storage, C: Clock, { pub fn new(replica_id: ReplicaId, storage: S, clock: C) -> Result { - let counter = storage.latest_counter(&replica_id)?; - let mut clock = clock; - clock.observe(storage.latest_lamport()); - Ok(Self { + Self::with_stores( replica_id, storage, clock, - counter, - nodes: MemoryNodeStore::default(), - version_vector: VersionVector::new(), - payloads: MemoryPayloadStore::default(), - head: None, - op_count: 0, - }) - } -} - -impl TreeCrdt -where - S: Storage, - C: Clock, - N: NodeStore, -{ - pub fn with_node_store(replica_id: ReplicaId, storage: S, clock: C, nodes: N) -> Result { - let counter = storage.latest_counter(&replica_id)?; - let mut clock = clock; - clock.observe(storage.latest_lamport()); - Ok(Self { - replica_id, - storage, - clock, - counter, - nodes, - version_vector: VersionVector::new(), - payloads: MemoryPayloadStore::default(), - head: None, - op_count: 0, - }) + MemoryNodeStore::default(), + MemoryPayloadStore::default(), + ) } } @@ -240,19 +98,12 @@ where ) == std::cmp::Ordering::Greater } - pub fn local_insert_after( - &mut self, - parent: NodeId, - node: NodeId, - after: Option, - ) -> Result { + fn next_op_meta(&mut self) -> (ReplicaId, u64, Lamport, Vec) { let replica = self.replica_id.clone(); let counter = self.next_counter(); let lamport = self.clock.tick(); let seed = Self::seed(&replica, counter); - let order_key = self.allocate_child_key_after(parent, node, after, &seed)?; - let op = Operation::insert(&replica, counter, lamport, parent, node, order_key); - self.commit_local(op) + (replica, counter, lamport, seed) } pub fn resolve_after_for_placement( @@ -281,7 +132,7 @@ where } } - pub fn local_insert_with_plan( + pub fn local_insert( &mut self, parent: NodeId, node: NodeId, @@ -289,11 +140,12 @@ where payload: Option>, ) -> Result<(Operation, LocalFinalizePlan)> { let after = self.resolve_after_for_placement(parent, placement, None)?; - let op = if let Some(payload) = payload { - self.local_insert_after_with_payload(parent, node, after, payload)? - } else { - self.local_insert_after(parent, node, after)? - }; + let (replica, counter, lamport, seed) = self.next_op_meta(); + let order_key = self.allocate_child_key_after(parent, node, after, &seed)?; + let op = Operation::insert_with_optional_payload( + &replica, counter, lamport, parent, node, order_key, payload, + ); + let op = self.commit_local(op)?; Ok(( op, LocalFinalizePlan { @@ -303,40 +155,7 @@ where )) } - pub fn local_insert_after_with_payload( - &mut self, - parent: NodeId, - node: NodeId, - after: Option, - payload: impl Into>, - ) -> Result { - let replica = self.replica_id.clone(); - let counter = self.next_counter(); - let lamport = self.clock.tick(); - let seed = Self::seed(&replica, counter); - let order_key = self.allocate_child_key_after(parent, node, after, &seed)?; - let op = Operation::insert_with_payload( - &replica, counter, lamport, parent, node, order_key, payload, - ); - self.commit_local(op) - } - - pub fn local_move_after( - &mut self, - node: NodeId, - new_parent: NodeId, - after: Option, - ) -> Result { - let replica = self.replica_id.clone(); - let counter = self.next_counter(); - let lamport = self.clock.tick(); - let seed = Self::seed(&replica, counter); - let order_key = self.allocate_child_key_after(new_parent, node, after, &seed)?; - let op = Operation::move_node(&replica, counter, lamport, node, new_parent, order_key); - self.commit_local(op) - } - - pub fn local_move_with_plan( + pub fn local_move( &mut self, node: NodeId, new_parent: NodeId, @@ -344,7 +163,10 @@ where ) -> Result<(Operation, LocalFinalizePlan)> { let old_parent = self.parent(node)?; let after = self.resolve_after_for_placement(new_parent, placement, Some(node))?; - let op = self.local_move_after(node, new_parent, after)?; + let (replica, counter, lamport, seed) = self.next_op_meta(); + let order_key = self.allocate_child_key_after(new_parent, node, after, &seed)?; + let op = Operation::move_node(&replica, counter, lamport, node, new_parent, order_key); + let op = self.commit_local(op)?; let mut parent_hints = vec![new_parent]; if let Some(parent) = old_parent { @@ -367,93 +189,46 @@ where )) } - pub fn local_delete(&mut self, node: NodeId) -> Result { - let replica = self.replica_id.clone(); - let counter = self.next_counter(); - let lamport = self.clock.tick(); + pub fn local_delete(&mut self, node: NodeId) -> Result<(Operation, LocalFinalizePlan)> { + let old_parent = self.parent(node)?; + let (replica, counter, lamport, _seed) = self.next_op_meta(); let known_state = Some(self.nodes.subtree_version_vector(node)?); let op = Operation::delete(&replica, counter, lamport, node, known_state); - self.commit_local(op) - } - - pub fn local_delete_with_plan( - &mut self, - node: NodeId, - ) -> Result<(Operation, LocalFinalizePlan)> { - let old_parent = self.parent(node)?; - let op = self.local_delete(node)?; - let mut parent_hints = Vec::new(); - if let Some(parent) = old_parent { - parent_hints.push(parent); - } + let op = self.commit_local(op)?; Ok(( op, LocalFinalizePlan { - parent_hints, + parent_hints: parent_hints_from(old_parent), extra_index_records: Vec::new(), }, )) } - pub fn local_set_payload( - &mut self, - node: NodeId, - payload: impl Into>, - ) -> Result { - let replica = self.replica_id.clone(); - let counter = self.next_counter(); - let lamport = self.clock.tick(); - let op = Operation::set_payload(&replica, counter, lamport, node, payload); - self.commit_local(op) - } - - pub fn local_clear_payload(&mut self, node: NodeId) -> Result { - let replica = self.replica_id.clone(); - let counter = self.next_counter(); - let lamport = self.clock.tick(); - let op = Operation::clear_payload(&replica, counter, lamport, node); - self.commit_local(op) - } - - pub fn local_payload_with_plan( + pub fn local_payload( &mut self, node: NodeId, payload: Option>, ) -> Result<(Operation, LocalFinalizePlan)> { let parent = self.parent(node)?; + let (replica, counter, lamport, _seed) = self.next_op_meta(); let op = if let Some(payload) = payload { - self.local_set_payload(node, payload)? + Operation::set_payload(&replica, counter, lamport, node, payload) } else { - self.local_clear_payload(node)? + Operation::clear_payload(&replica, counter, lamport, node) }; - let mut parent_hints = Vec::new(); - if let Some(parent) = parent { - parent_hints.push(parent); - } + let op = self.commit_local(op)?; Ok(( op, LocalFinalizePlan { - parent_hints, + parent_hints: parent_hints_from(parent), extra_index_records: Vec::new(), }, )) } pub fn apply_remote(&mut self, op: Operation) -> Result<()> { - self.clock.observe(op.meta.lamport); - self.version_vector.observe(&op.meta.id.replica, op.meta.id.counter); - if !self.storage.apply(op.clone())? { - return Ok(()); - } - - if self.is_in_order(&op) { - let _ = Self::apply_forward(&mut self.nodes, &mut self.payloads, &op)?; - self.op_count += 1; - self.head = Some(op); - return Ok(()); - } - - self.replay_from_storage() + self.apply_remote_with_delta(op)?; + Ok(()) } /// Apply one remote operation and return exact incremental delta when available. @@ -492,23 +267,25 @@ where Ok(None) } - /// Apply a remote operation while maintaining adapter-provided derived state. + /// Apply a remote op with full materialization bookkeeping. /// - /// This wires together: - /// - core CRDT semantics (`apply_remote_with_delta`) - /// - a parent→op index (`ParentOpIndex`) for partial sync - /// - cached tombstone flags in the [`NodeStore`] (via `set_tombstone`) - pub fn apply_remote_with_materialization( + /// This wires together core CRDT semantics (`apply_remote_with_delta`), + /// a parent-op index (`ParentOpIndex`) for partial sync, and cached + /// tombstone flags in the [`NodeStore`]. The materialization sequence + /// is advanced only when the operation is actually accepted. + pub fn apply_remote_with_materialization_seq( &mut self, op: Operation, index: &mut I, - seq: u64, + seq: &mut u64, ) -> Result> { + *seq = (*seq).saturating_add(1); let snapshot = self.apply_remote_with_delta(op.clone())?.map(|delta| NodeSnapshot { parent: delta.snapshot.parent, order_key: delta.snapshot.order_key, }); let Some(snapshot) = snapshot else { + *seq = (*seq).saturating_sub(1); return Ok(None); }; let affected_nodes = direct_affected_nodes(snapshot.parent, &op.kind); @@ -516,28 +293,11 @@ where snapshot, &op, index, - seq, + *seq, affected_nodes, )?)) } - /// Apply a remote op and advance materialization sequence only when it is accepted. - /// - /// Adapters can hold `seq` in metadata and pass it by mutable reference across a batch. - pub fn apply_remote_with_materialization_seq( - &mut self, - op: Operation, - index: &mut I, - seq: &mut u64, - ) -> Result> { - *seq = (*seq).saturating_add(1); - let applied = self.apply_remote_with_materialization(op, index, *seq)?; - if applied.is_none() { - *seq = (*seq).saturating_sub(1); - } - Ok(applied) - } - /// Apply a canonically sorted remote op directly against the current materialized state. /// /// This skips storage persistence and out-of-order detection, and is intended for callers @@ -567,36 +327,6 @@ where /// /// This is intended for adapters that execute local operations directly against core and then /// need to keep external materialized indexes/metadata in sync. - pub fn finalize_local_materialization( - &mut self, - op: &Operation, - index: &mut I, - seq: u64, - parent_hints: &[NodeId], - extra_index_records: &[(NodeId, OperationId)], - ) -> Result<()> { - let mut refresh_starts: Vec = parent_hints.to_vec(); - refresh_starts.push(op.kind.node()); - self.refresh_tombstones_upward(refresh_starts)?; - - let mut seen: HashSet = HashSet::new(); - for parent in parent_hints { - if *parent == NodeId::TRASH || !seen.insert(*parent) { - continue; - } - index.record(*parent, &op.meta.id, seq)?; - } - - for (parent, op_id) in extra_index_records { - if *parent == NodeId::TRASH { - continue; - } - index.record(*parent, op_id, seq)?; - } - - Ok(()) - } - fn finalize_materialized_apply( &mut self, snapshot: NodeSnapshot, @@ -644,8 +374,7 @@ where affected_nodes, }) } - - pub fn finalize_local_with_plan( + pub fn finalize_local( &mut self, op: &Operation, index: &mut I, @@ -653,17 +382,30 @@ where plan: &LocalFinalizePlan, ) -> Result { let seq = head_seq.saturating_add(1); - self.finalize_local_materialization( - op, - index, - seq, - &plan.parent_hints, - &plan.extra_index_records, - )?; + + let mut refresh_starts: Vec = plan.parent_hints.to_vec(); + refresh_starts.push(op.kind.node()); + self.refresh_tombstones_upward(refresh_starts)?; + + let mut seen: HashSet = HashSet::new(); + for parent in &plan.parent_hints { + if *parent == NodeId::TRASH || !seen.insert(*parent) { + continue; + } + index.record(*parent, &op.meta.id, seq)?; + } + + for (parent, op_id) in &plan.extra_index_records { + if *parent == NodeId::TRASH { + continue; + } + index.record(*parent, op_id, seq)?; + } + Ok(seq) } - pub fn refresh_tombstones_upward(&mut self, starts: I) -> Result<()> + fn refresh_tombstones_upward(&mut self, starts: I) -> Result<()> where I: IntoIterator, { @@ -674,7 +416,7 @@ where /// Refresh tombstone cache for nodes on the upward closure of `starts`. /// /// Returns every node whose cached tombstone value actually changed. - pub fn refresh_tombstones_upward_with_delta(&mut self, starts: I) -> Result> + fn refresh_tombstones_upward_with_delta(&mut self, starts: I) -> Result> where I: IntoIterator, { @@ -710,126 +452,6 @@ where Ok(sorted_node_ids(changed)) } - pub fn refresh_all_tombstones(&mut self) -> Result<()> { - fn subtree_vv( - nodes: &N, - node: NodeId, - cache: &mut HashMap, - visiting: &mut HashSet, - ) -> Result { - if let Some(vv) = cache.get(&node) { - return Ok(vv.clone()); - } - if !visiting.insert(node) { - return Err(Error::InconsistentState( - "cycle detected while computing subtree version vector".into(), - )); - } - - let mut vv = nodes.last_change(node)?; - for child in nodes.children(node)? { - let child_vv = subtree_vv(nodes, child, cache, visiting)?; - vv.merge(&child_vv); - } - - visiting.remove(&node); - cache.insert(node, vv.clone()); - Ok(vv) - } - - let nodes = self.nodes.all_nodes()?; - let nodes_ro = &self.nodes; - - let mut cache: HashMap = HashMap::new(); - let mut visiting: HashSet = HashSet::new(); - let mut updates: Vec<(NodeId, bool)> = Vec::new(); - - for node in nodes { - if node == NodeId::ROOT || node == NodeId::TRASH { - continue; - } - let Some(deleted_vv) = nodes_ro.deleted_at(node)? else { - continue; - }; - let subtree = subtree_vv(nodes_ro, node, &mut cache, &mut visiting)?; - updates.push((node, deleted_vv.is_aware_of(&subtree))); - } - - for (node, tombstoned) in updates { - self.nodes.set_tombstone(node, tombstoned)?; - } - - Ok(()) - } - - pub fn replay_from_storage_with_materialization( - &mut self, - index: &mut I, - ) -> Result<()> { - index.reset()?; - - self.version_vector = VersionVector::new(); - self.nodes.reset()?; - self.payloads.reset()?; - self.head = None; - self.op_count = 0; - - let storage = &self.storage; - let nodes = &mut self.nodes; - let payloads = &mut self.payloads; - let clock = &mut self.clock; - let version_vector = &mut self.version_vector; - - let mut seq: u64 = 0; - let mut head: Option = None; - - storage.scan_since(0, &mut |op| { - clock.observe(op.meta.lamport); - version_vector.observe(&op.meta.id.replica, op.meta.id.counter); - - let snapshot = Self::apply_forward(nodes, payloads, &op)?; - seq += 1; - - let parents = affected_parents(snapshot.parent, &op.kind); - for parent in &parents { - if *parent == NodeId::TRASH { - continue; - } - index.record(*parent, &op.meta.id, seq)?; - } - - head = Some(op); - Ok(()) - })?; - - self.head = head; - self.op_count = seq; - self.counter = self.counter.max(self.version_vector.get(&self.replica_id)); - - // Refresh cached tombstone flags and then ensure the latest payload op for each node is - // discoverable under its current parent. - self.refresh_all_tombstones()?; - - let payload_seq = seq.max(1); - for node in self.nodes.all_nodes()? { - if node == NodeId::ROOT || node == NodeId::TRASH { - continue; - } - let Some(parent) = self.nodes.parent(node)? else { - continue; - }; - if parent == NodeId::TRASH { - continue; - } - let Some((_lamport, payload_id)) = self.payload_last_writer(node)? else { - continue; - }; - index.record(parent, &payload_id, payload_seq)?; - } - - Ok(()) - } - pub fn operations_since(&self, lamport: Lamport) -> Result> { self.storage.load_since(lamport) } @@ -941,59 +563,16 @@ where Ok(nodes) } - pub fn log_len(&self) -> usize { - self.op_count.min(usize::MAX as u64) as usize - } - pub fn head_op(&self) -> Option<&Operation> { self.head.as_ref() } - pub(crate) fn node_store_mut(&mut self) -> &mut N { - &mut self.nodes - } - - pub fn validate_invariants(&self) -> Result<()> { - for pid in self.nodes.all_nodes()? { - let pchildren = self.nodes.children(pid)?; - let mut seen = HashSet::new(); - for child in pchildren { - if !seen.insert(child) { - return Err(Error::InvalidOperation("duplicate child entry".into())); - } - if !self.nodes.exists(child)? { - return Err(Error::InvalidOperation("child not present in nodes".into())); - } - if self.nodes.parent(child)? != Some(pid) { - return Err(Error::InvalidOperation("child parent mismatch".into())); - } - } - } - - for node in self.nodes.all_nodes()? { - if self.has_cycle_from(node)? { - return Err(Error::InvalidOperation("cycle detected".into())); - } - } - Ok(()) + pub(crate) fn node_store(&self) -> &N { + &self.nodes } - fn has_cycle_from(&self, start: NodeId) -> Result { - if start == NodeId::ROOT || start == NodeId::TRASH { - return Ok(false); - } - let mut visited = HashSet::new(); - let mut current = Some(start); - while let Some(n) = current { - if !visited.insert(n) { - return Ok(true); - } - if n == NodeId::ROOT || n == NodeId::TRASH { - return Ok(false); - } - current = self.nodes.parent(n)?; - } - Ok(false) + pub(crate) fn node_store_mut(&mut self) -> &mut N { + &mut self.nodes } fn commit_local(&mut self, op: Operation) -> Result { diff --git a/packages/treecrdt-core/src/types.rs b/packages/treecrdt-core/src/types.rs new file mode 100644 index 00000000..cf8dbb3a --- /dev/null +++ b/packages/treecrdt-core/src/types.rs @@ -0,0 +1,55 @@ +use crate::error::{Error, Result}; +use crate::ids::{NodeId, OperationId}; +use crate::version_vector::VersionVector; + +#[derive(Clone, Debug)] +pub struct NodeExport { + pub node: NodeId, + pub parent: Option, + pub children: Vec, + pub last_change: VersionVector, + pub deleted_at: Option, +} + +#[derive(Clone, Debug)] +pub struct NodeSnapshotExport { + pub parent: Option, + pub order_key: Option>, +} + +#[derive(Clone, Debug)] +pub struct ApplyDelta { + pub snapshot: NodeSnapshotExport, + pub affected_nodes: Vec, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum LocalPlacement { + First, + Last, + After(NodeId), +} + +impl LocalPlacement { + pub fn from_parts(placement: &str, after: Option) -> Result { + match placement { + "first" => Ok(Self::First), + "last" => Ok(Self::Last), + "after" => { + let Some(after_id) = after else { + return Err(Error::InvalidOperation( + "missing after for placement=after".into(), + )); + }; + Ok(Self::After(after_id)) + } + _ => Err(Error::InvalidOperation("invalid placement".into())), + } + } +} + +#[derive(Clone, Debug, Default)] +pub struct LocalFinalizePlan { + pub parent_hints: Vec, + pub extra_index_records: Vec<(NodeId, OperationId)>, +} diff --git a/packages/treecrdt-core/src/validation.rs b/packages/treecrdt-core/src/validation.rs new file mode 100644 index 00000000..87b0859f --- /dev/null +++ b/packages/treecrdt-core/src/validation.rs @@ -0,0 +1,59 @@ +use std::collections::HashSet; + +use crate::error::{Error, Result}; +use crate::ids::NodeId; +use crate::traits::{Clock, NodeStore, PayloadStore, Storage}; +use crate::tree::TreeCrdt; + +impl TreeCrdt +where + S: Storage, + C: Clock, + N: NodeStore, + P: PayloadStore, +{ + pub fn validate_invariants(&self) -> Result<()> { + let nodes = self.node_store(); + for pid in nodes.all_nodes()? { + let pchildren = nodes.children(pid)?; + let mut seen = HashSet::new(); + for child in pchildren { + if !seen.insert(child) { + return Err(Error::InvalidOperation("duplicate child entry".into())); + } + if !nodes.exists(child)? { + return Err(Error::InvalidOperation("child not present in nodes".into())); + } + if nodes.parent(child)? != Some(pid) { + return Err(Error::InvalidOperation("child parent mismatch".into())); + } + } + } + + for node in nodes.all_nodes()? { + if self.has_cycle_from(node)? { + return Err(Error::InvalidOperation("cycle detected".into())); + } + } + Ok(()) + } + + fn has_cycle_from(&self, start: NodeId) -> Result { + if start == NodeId::ROOT || start == NodeId::TRASH { + return Ok(false); + } + let nodes = self.node_store(); + let mut visited = HashSet::new(); + let mut current = Some(start); + while let Some(n) = current { + if !visited.insert(n) { + return Ok(true); + } + if n == NodeId::ROOT || n == NodeId::TRASH { + return Ok(false); + } + current = nodes.parent(n)?; + } + Ok(false) + } +} diff --git a/packages/treecrdt-core/tests/conflict_resolution.rs b/packages/treecrdt-core/tests/conflict_resolution.rs index 953e4a41..07c26999 100644 --- a/packages/treecrdt-core/tests/conflict_resolution.rs +++ b/packages/treecrdt-core/tests/conflict_resolution.rs @@ -1,4 +1,6 @@ -use treecrdt_core::{LamportClock, MemoryStorage, NodeId, Operation, ReplicaId, TreeCrdt}; +use treecrdt_core::{ + LamportClock, LocalPlacement, MemoryStorage, NodeId, Operation, ReplicaId, TreeCrdt, +}; #[test] fn higher_lamport_wins_on_conflict() { @@ -14,12 +16,13 @@ fn higher_lamport_wins_on_conflict() { let left = NodeId(10); let right = NodeId(11); - let insert_left = crdt_a.local_insert_after(root, left, None).unwrap(); - let insert_right = crdt_a.local_insert_after(root, right, Some(left)).unwrap(); - let insert_x = crdt_a.local_insert_after(root, x, Some(right)).unwrap(); + let (insert_left, _) = crdt_a.local_insert(root, left, LocalPlacement::First, None).unwrap(); + let (insert_right, _) = + crdt_a.local_insert(root, right, LocalPlacement::After(left), None).unwrap(); + let (insert_x, _) = crdt_a.local_insert(root, x, LocalPlacement::After(right), None).unwrap(); // replica a moves x under left (lamport 4) - let move_left = crdt_a.local_move_after(x, left, None).unwrap(); + let (move_left, _) = crdt_a.local_move(x, left, LocalPlacement::First).unwrap(); // replica b moves x under right with higher lamport let mut crdt_b = TreeCrdt::new( diff --git a/packages/treecrdt-core/tests/defensive_delete.rs b/packages/treecrdt-core/tests/defensive_delete.rs index ebedb0eb..ed3aad49 100644 --- a/packages/treecrdt-core/tests/defensive_delete.rs +++ b/packages/treecrdt-core/tests/defensive_delete.rs @@ -1,4 +1,6 @@ -use treecrdt_core::{LamportClock, MemoryStorage, NodeId, NoopParentOpIndex, ReplicaId, TreeCrdt}; +use treecrdt_core::{ + LamportClock, LocalPlacement, MemoryStorage, NodeId, NoopParentOpIndex, ReplicaId, TreeCrdt, +}; #[test] fn defensive_delete_parent_then_insert_child_restores_parent() { @@ -18,17 +20,16 @@ fn defensive_delete_parent_then_insert_child_restores_parent() { let parent = NodeId(1); let child = NodeId(2); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - // Client B inserts child first, then Client A deletes without awareness - // Defensive delete: parent should be restored because delete was unaware of modifications - let insert_child_op = crdt_b.local_insert_after(parent, child, None).unwrap(); + let (insert_child_op, _) = + crdt_b.local_insert(parent, child, LocalPlacement::First, None).unwrap(); assert_eq!(crdt_b.parent(child).unwrap(), Some(parent)); assert!(!crdt_b.is_tombstoned(parent).unwrap()); - // Client A deletes without having seen the insert - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_a.apply_remote(insert_child_op.clone()).unwrap(); @@ -70,24 +71,29 @@ fn defensive_delete_parent_then_move_child_restores_parent() { let child = NodeId(2); let other_parent = NodeId(3); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let other_parent_op = - crdt_a.local_insert_after(NodeId::ROOT, other_parent, Some(parent)).unwrap(); + let (other_parent_op, _) = crdt_a + .local_insert( + NodeId::ROOT, + other_parent, + LocalPlacement::After(parent), + None, + ) + .unwrap(); crdt_b.apply_remote(other_parent_op).unwrap(); - let child_op = crdt_a.local_insert_after(other_parent, child, None).unwrap(); + let (child_op, _) = + crdt_a.local_insert(other_parent, child, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(child_op).unwrap(); - // Client B moves child first, then Client A deletes without awareness - // Defensive delete: parent should be restored because delete was unaware of modifications - let move_op = crdt_b.local_move_after(child, parent, None).unwrap(); + let (move_op, _) = crdt_b.local_move(child, parent, LocalPlacement::First).unwrap(); assert_eq!(crdt_b.parent(child).unwrap(), Some(parent)); assert!(!crdt_b.is_tombstoned(parent).unwrap()); - // Client A deletes without having seen the move - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_a.apply_remote(move_op.clone()).unwrap(); @@ -132,29 +138,30 @@ fn defensive_delete_sibling_moved_same_parent_then_deleted_restores_node() { let middle = NodeId(3); let last = NodeId(4); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let first_op = crdt_a.local_insert_after(parent, first, None).unwrap(); + let (first_op, _) = crdt_a.local_insert(parent, first, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(first_op).unwrap(); - let middle_op = crdt_a.local_insert_after(parent, middle, Some(first)).unwrap(); + let (middle_op, _) = + crdt_a.local_insert(parent, middle, LocalPlacement::After(first), None).unwrap(); crdt_b.apply_remote(middle_op).unwrap(); - let last_op = crdt_a.local_insert_after(parent, last, Some(middle)).unwrap(); + let (last_op, _) = + crdt_a.local_insert(parent, last, LocalPlacement::After(middle), None).unwrap(); crdt_b.apply_remote(last_op).unwrap(); assert_eq!(crdt_a.children(parent).unwrap(), &[first, middle, last]); assert_eq!(crdt_b.children(parent).unwrap(), &[first, middle, last]); - // Client B moves middle within same parent (position only): [first, last, middle] - let move_op = crdt_b.local_move_after(middle, parent, Some(last)).unwrap(); + let (move_op, _) = crdt_b.local_move(middle, parent, LocalPlacement::After(last)).unwrap(); assert_eq!(crdt_b.parent(middle).unwrap(), Some(parent)); assert_eq!(crdt_b.children(parent).unwrap(), &[first, last, middle]); assert!(!crdt_b.is_tombstoned(middle).unwrap()); - // Client A deletes middle without having seen the move - let delete_op = crdt_a.local_delete(middle).unwrap(); + let (delete_op, _) = crdt_a.local_delete(middle).unwrap(); assert!(crdt_a.is_tombstoned(middle).unwrap()); crdt_a.apply_remote(move_op.clone()).unwrap(); @@ -197,29 +204,30 @@ fn defensive_delete_parent_when_sibling_moved_same_parent_restores_parent() { let middle = NodeId(3); let last = NodeId(4); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let first_op = crdt_a.local_insert_after(parent, first, None).unwrap(); + let (first_op, _) = crdt_a.local_insert(parent, first, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(first_op).unwrap(); - let middle_op = crdt_a.local_insert_after(parent, middle, Some(first)).unwrap(); + let (middle_op, _) = + crdt_a.local_insert(parent, middle, LocalPlacement::After(first), None).unwrap(); crdt_b.apply_remote(middle_op).unwrap(); - let last_op = crdt_a.local_insert_after(parent, last, Some(middle)).unwrap(); + let (last_op, _) = + crdt_a.local_insert(parent, last, LocalPlacement::After(middle), None).unwrap(); crdt_b.apply_remote(last_op).unwrap(); assert_eq!(crdt_a.children(parent).unwrap(), &[first, middle, last]); assert_eq!(crdt_b.children(parent).unwrap(), &[first, middle, last]); - // Client A moves middle within same parent (position only): [first, last, middle] - let move_op = crdt_a.local_move_after(middle, parent, Some(last)).unwrap(); + let (move_op, _) = crdt_a.local_move(middle, parent, LocalPlacement::After(last)).unwrap(); assert_eq!(crdt_a.parent(middle).unwrap(), Some(parent)); assert_eq!(crdt_a.children(parent).unwrap(), &[first, last, middle]); assert!(!crdt_a.is_tombstoned(parent).unwrap()); - // Client B deletes parent without having seen the move - let delete_op = crdt_b.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_b.local_delete(parent).unwrap(); assert!(crdt_b.is_tombstoned(parent).unwrap()); crdt_a.apply_remote(delete_op.clone()).unwrap(); @@ -265,19 +273,20 @@ fn defensive_delete_parent_then_multiple_children_restores_parent() { let child1 = NodeId(2); let child2 = NodeId(3); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let insert_child1_op = crdt_b.local_insert_after(parent, child1, None).unwrap(); + let (insert_child1_op, _) = + crdt_b.local_insert(parent, child1, LocalPlacement::First, None).unwrap(); - // Client B inserts children first, then Client A deletes without awareness - // Defensive delete: parent should be restored because delete was unaware of modifications - let insert_child2_op = crdt_b.local_insert_after(parent, child2, Some(child1)).unwrap(); + let (insert_child2_op, _) = crdt_b + .local_insert(parent, child2, LocalPlacement::After(child1), None) + .unwrap(); assert!(!crdt_b.is_tombstoned(parent).unwrap()); assert_eq!(crdt_b.children(parent).unwrap(), &[child1, child2]); - // Client A deletes without having seen the second child insert - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_a.apply_remote(insert_child1_op.clone()).unwrap(); @@ -321,26 +330,23 @@ fn defensive_delete_insert_then_delete_no_restoration() { let parent = NodeId(1); let child = NodeId(2); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - // Client B inserts child first - let child_op = crdt_b.local_insert_after(parent, child, None).unwrap(); + let (child_op, _) = crdt_b.local_insert(parent, child, LocalPlacement::First, None).unwrap(); assert_eq!(crdt_b.parent(child).unwrap(), Some(parent)); assert!(!crdt_b.is_tombstoned(parent).unwrap()); - // Client A receives insert first (has full awareness), then deletes with higher lamport - // Since Client A is aware of all changes, delete should succeed and parent should stay tombstoned crdt_a.apply_remote(child_op.clone()).unwrap(); assert_eq!(crdt_a.parent(child).unwrap(), Some(parent)); assert!(!crdt_a.is_tombstoned(parent).unwrap()); - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_b.apply_remote(delete_op).unwrap(); - // Parent should stay tombstoned because delete happened with full awareness of modifications assert!(crdt_a.is_tombstoned(parent).unwrap()); assert!(crdt_b.is_tombstoned(parent).unwrap()); assert_eq!(crdt_a.parent(child).unwrap(), Some(parent)); @@ -368,15 +374,14 @@ fn defensive_delete_parent_then_payload_change_restores_parent() { let parent = NodeId(1); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - // Client B sets payload first, then Client A deletes without awareness. - // Defensive delete: parent should be restored because delete was unaware of modifications. - let set_payload_op = crdt_b.local_set_payload(parent, b"hello".to_vec()).unwrap(); + let (set_payload_op, _) = crdt_b.local_payload(parent, Some(b"hello".to_vec())).unwrap(); assert_eq!(crdt_b.payload(parent).unwrap(), Some(b"hello".to_vec())); - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_a.apply_remote(set_payload_op.clone()).unwrap(); @@ -414,19 +419,18 @@ fn defensive_delete_parent_then_payload_change_no_restoration_when_aware() { let parent = NodeId(1); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let set_payload_op = crdt_b.local_set_payload(parent, b"hello".to_vec()).unwrap(); + let (set_payload_op, _) = crdt_b.local_payload(parent, Some(b"hello".to_vec())).unwrap(); crdt_a.apply_remote(set_payload_op).unwrap(); - // Client A deletes with full awareness of payload. - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_b.apply_remote(delete_op).unwrap(); - // Parent should stay tombstoned because delete happened with full awareness. assert!(crdt_a.is_tombstoned(parent).unwrap()); assert!(crdt_b.is_tombstoned(parent).unwrap()); assert_eq!(crdt_a.payload(parent).unwrap(), Some(b"hello".to_vec())); @@ -455,24 +459,21 @@ fn defensive_delete_later_delete_unaware_restores_parent() { let parent = NodeId(1); let child = NodeId(2); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - // Client B inserts child first - let insert_child_op = crdt_b.local_insert_after(parent, child, None).unwrap(); + let (insert_child_op, _) = + crdt_b.local_insert(parent, child, LocalPlacement::First, None).unwrap(); assert_eq!(crdt_b.parent(child).unwrap(), Some(parent)); assert!(!crdt_b.is_tombstoned(parent).unwrap()); - // Client A deletes without having seen the insert - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); - // Synchronize: Client A receives the insert, Client B receives the delete crdt_a.apply_remote(insert_child_op.clone()).unwrap(); crdt_b.apply_remote(delete_op).unwrap(); - // Defensive delete: parent should be restored because delete was unaware of modifications - // even though it happened later in time (higher lamport) assert!( !crdt_a.is_tombstoned(parent).unwrap(), "Parent should be restored" @@ -509,16 +510,15 @@ fn defensive_delete_insert_delete_sequence() { let parent = NodeId(1); let child = NodeId(2); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - // Client B inserts child first, then Client A deletes without awareness - // Defensive delete: parent should be restored because delete was unaware of modifications - let insert_child_op = crdt_b.local_insert_after(parent, child, None).unwrap(); + let (insert_child_op, _) = + crdt_b.local_insert(parent, child, LocalPlacement::First, None).unwrap(); assert!(!crdt_b.is_tombstoned(parent).unwrap()); - // Client A deletes without having seen the insert - let delete1_op = crdt_a.local_delete(parent).unwrap(); + let (delete1_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); crdt_a.apply_remote(insert_child_op.clone()).unwrap(); @@ -560,19 +560,18 @@ fn defensive_delete_multiple_deletes_then_insert_restores_parent() { let parent = NodeId(1); let child = NodeId(2); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op.clone()).unwrap(); crdt_c.apply_remote(parent_op).unwrap(); - // Client C inserts child first, then Clients A and B delete concurrently without awareness - // Defensive delete: parent should be restored because deletes were unaware of modifications - let insert_child_op = crdt_c.local_insert_after(parent, child, None).unwrap(); + let (insert_child_op, _) = + crdt_c.local_insert(parent, child, LocalPlacement::First, None).unwrap(); assert!(!crdt_c.is_tombstoned(parent).unwrap()); assert_eq!(crdt_c.children(parent).unwrap(), &[child]); - // Clients A and B delete without having seen the insert - let delete_a = crdt_a.local_delete(parent).unwrap(); - let delete_b = crdt_b.local_delete(parent).unwrap(); + let (delete_a, _) = crdt_a.local_delete(parent).unwrap(); + let (delete_b, _) = crdt_b.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); assert!(crdt_b.is_tombstoned(parent).unwrap()); @@ -627,35 +626,40 @@ fn defensive_delete_parent_then_modify_grandchild_restores_parent() { let grandchild = NodeId(3); let other_parent = NodeId(4); - // Setup: root -> parent -> child -> grandchild - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let child_op = crdt_a.local_insert_after(parent, child, None).unwrap(); + let (child_op, _) = crdt_a.local_insert(parent, child, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(child_op).unwrap(); - let grandchild_op = crdt_a.local_insert_after(child, grandchild, None).unwrap(); + let (grandchild_op, _) = + crdt_a.local_insert(child, grandchild, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(grandchild_op).unwrap(); - // Create another parent for moving the grandchild - let other_parent_op = - crdt_a.local_insert_after(NodeId::ROOT, other_parent, Some(parent)).unwrap(); + let (other_parent_op, _) = crdt_a + .local_insert( + NodeId::ROOT, + other_parent, + LocalPlacement::After(parent), + None, + ) + .unwrap(); crdt_b.apply_remote(other_parent_op).unwrap(); - let move_grandchild_op = crdt_b.local_move_after(grandchild, other_parent, None).unwrap(); + let (move_grandchild_op, _) = + crdt_b.local_move(grandchild, other_parent, LocalPlacement::First).unwrap(); assert_eq!(crdt_b.parent(grandchild).unwrap(), Some(other_parent)); assert!(!crdt_b.is_tombstoned(parent).unwrap()); assert_eq!(crdt_b.children(parent).unwrap(), &[child]); assert_eq!(crdt_b.children(other_parent).unwrap(), &[grandchild]); - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); - // Synchronize: Client A receives the move, Client B receives the delete crdt_a.apply_remote(move_grandchild_op.clone()).unwrap(); crdt_b.apply_remote(delete_op).unwrap(); - // Defensive delete: parent should be restored because delete was unaware of grandchild modification assert!( !crdt_a.is_tombstoned(parent).unwrap(), "Parent should be restored" @@ -680,7 +684,6 @@ fn defensive_delete_parent_then_modify_grandchild_restores_parent() { #[test] fn delete_unrelated_ops_should_not_prevent_restoration_when_child_insert_was_unseen() { - // Demonstrates false awareness when known_state uses max Lamport per replica (gaps across subtrees). let mut crdt_a = TreeCrdt::new( ReplicaId::new(b"a"), MemoryStorage::default(), @@ -698,15 +701,19 @@ fn delete_unrelated_ops_should_not_prevent_restoration_when_child_insert_was_uns let child = NodeId(2); let unrelated = NodeId(99); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let insert_child_op = crdt_b.local_insert_after(parent, child, None).unwrap(); + let (insert_child_op, _) = + crdt_b.local_insert(parent, child, LocalPlacement::First, None).unwrap(); - let unrelated_op = crdt_b.local_insert_after(NodeId::ROOT, unrelated, Some(parent)).unwrap(); + let (unrelated_op, _) = crdt_b + .local_insert(NodeId::ROOT, unrelated, LocalPlacement::After(parent), None) + .unwrap(); crdt_a.apply_remote(unrelated_op).unwrap(); - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); crdt_b.apply_remote(delete_op.clone()).unwrap(); crdt_a.apply_remote(insert_child_op).unwrap(); @@ -722,7 +729,6 @@ fn delete_unrelated_ops_should_not_prevent_restoration_when_child_insert_was_uns #[test] fn delete_should_restore_when_earlier_child_op_from_same_replica_was_missing() { - // Requires dotted/range version vectors: seeing B:2 must not imply seeing B:1. let mut crdt_a = TreeCrdt::new( ReplicaId::new(b"a"), MemoryStorage::default(), @@ -740,15 +746,19 @@ fn delete_should_restore_when_earlier_child_op_from_same_replica_was_missing() { let child1 = NodeId(2); let child2 = NodeId(3); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b.apply_remote(parent_op).unwrap(); - let insert_child1_op = crdt_b.local_insert_after(parent, child1, None).unwrap(); - let insert_child2_op = crdt_b.local_insert_after(parent, child2, Some(child1)).unwrap(); + let (insert_child1_op, _) = + crdt_b.local_insert(parent, child1, LocalPlacement::First, None).unwrap(); + let (insert_child2_op, _) = crdt_b + .local_insert(parent, child2, LocalPlacement::After(child1), None) + .unwrap(); crdt_a.apply_remote(insert_child2_op).unwrap(); - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); crdt_b.apply_remote(delete_op.clone()).unwrap(); crdt_a.apply_remote(insert_child1_op).unwrap(); @@ -784,21 +794,21 @@ fn materialized_apply_delta_includes_parent_restored_by_unseen_payload_change() let parent = NodeId(1); let child = NodeId(2); - let parent_op = crdt_a.local_insert_after(NodeId::ROOT, parent, None).unwrap(); + let (parent_op, _) = + crdt_a.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); crdt_b .apply_remote_with_materialization_seq(parent_op, &mut index_b, &mut seq_b) .unwrap() .unwrap(); - let child_op = crdt_a.local_insert_after(parent, child, None).unwrap(); + let (child_op, _) = crdt_a.local_insert(parent, child, LocalPlacement::First, None).unwrap(); crdt_b .apply_remote_with_materialization_seq(child_op, &mut index_b, &mut seq_b) .unwrap() .unwrap(); - // Client B modifies child payload first. Client A deletes parent without seeing it. - let payload_op = crdt_b.local_set_payload(child, b"x".to_vec()).unwrap(); - let delete_op = crdt_a.local_delete(parent).unwrap(); + let (payload_op, _) = crdt_b.local_payload(child, Some(b"x".to_vec())).unwrap(); + let (delete_op, _) = crdt_a.local_delete(parent).unwrap(); assert!(crdt_a.is_tombstoned(parent).unwrap()); let delta = crdt_a diff --git a/packages/treecrdt-core/tests/materialization_helpers.rs b/packages/treecrdt-core/tests/materialization_helpers.rs index b0f421b4..fd0f6833 100644 --- a/packages/treecrdt-core/tests/materialization_helpers.rs +++ b/packages/treecrdt-core/tests/materialization_helpers.rs @@ -3,10 +3,11 @@ use std::rc::Rc; use treecrdt_core::{ apply_incremental_ops_with_delta, apply_persisted_remote_ops_with_delta, catch_up_materialized_state, materialize_persisted_remote_ops_with_delta, - try_shortcut_out_of_order_payload_noops, Lamport, LamportClock, MaterializationCursor, - MaterializationHead, MaterializationKey, MaterializationState, MemoryNodeStore, - MemoryPayloadStore, MemoryStorage, NodeId, NoopParentOpIndex, Operation, OperationId, - ParentOpIndex, PersistedRemoteStores, ReplicaId, Storage, TreeCrdt, + try_shortcut_out_of_order_payload_noops, Lamport, LamportClock, LocalFinalizePlan, + LocalPlacement, MaterializationCursor, MaterializationHead, MaterializationKey, + MaterializationState, MemoryNodeStore, MemoryPayloadStore, MemoryStorage, NodeId, + NoopParentOpIndex, Operation, OperationId, ParentOpIndex, PersistedRemoteStores, ReplicaId, + Storage, TreeCrdt, }; #[derive(Default)] @@ -110,7 +111,7 @@ impl Storage for CountingStorage { } #[test] -fn finalize_local_materialization_records_unique_hints_and_extras() { +fn finalize_local_records_unique_hints_and_extras() { let mut crdt = TreeCrdt::new( ReplicaId::new(b"local"), MemoryStorage::default(), @@ -120,27 +121,26 @@ fn finalize_local_materialization_records_unique_hints_and_extras() { let parent = NodeId(10); let node = NodeId(11); - crdt.local_insert_after(NodeId::ROOT, parent, None).unwrap(); - let op = crdt.local_insert_after(parent, node, None).unwrap(); + crdt.local_insert(NodeId::ROOT, parent, LocalPlacement::First, None).unwrap(); + let (op, _) = crdt.local_insert(parent, node, LocalPlacement::First, None).unwrap(); let extra_op_id = OperationId { replica: ReplicaId::new(b"extra"), counter: 7, }; - let mut index = RecordingIndex::default(); - crdt.finalize_local_materialization( - &op, - &mut index, - 42, - &[parent, parent, NodeId::TRASH], - &[ + let plan = LocalFinalizePlan { + parent_hints: vec![parent, parent, NodeId::TRASH], + extra_index_records: vec![ (parent, extra_op_id.clone()), (NodeId::TRASH, extra_op_id.clone()), ], - ) - .unwrap(); + }; + + let mut index = RecordingIndex::default(); + let seq = crdt.finalize_local(&op, &mut index, 41, &plan).unwrap(); + assert_eq!(seq, 42); assert_eq!(index.records.len(), 2); assert_eq!(index.records[0], (parent, op.meta.id.clone(), 42)); assert_eq!(index.records[1], (parent, extra_op_id, 42)); diff --git a/packages/treecrdt-core/tests/operations.rs b/packages/treecrdt-core/tests/operations.rs index d261327f..586971e2 100644 --- a/packages/treecrdt-core/tests/operations.rs +++ b/packages/treecrdt-core/tests/operations.rs @@ -1,6 +1,6 @@ use treecrdt_core::{ - LamportClock, LocalFinalizePlan, LocalPlacement, MemoryStorage, NodeId, NoopParentOpIndex, - Operation, ReplicaId, TreeCrdt, + LamportClock, LocalPlacement, MemoryStorage, NodeId, NoopParentOpIndex, Operation, ReplicaId, + TreeCrdt, }; #[test] @@ -16,14 +16,13 @@ fn inserts_and_moves_nodes() { let a = NodeId(1); let b = NodeId(2); - crdt.local_insert_after(root, a, None).unwrap(); - crdt.local_insert_after(a, b, None).unwrap(); + crdt.local_insert(root, a, LocalPlacement::First, None).unwrap(); + crdt.local_insert(a, b, LocalPlacement::First, None).unwrap(); assert_eq!(crdt.parent(a).unwrap(), Some(root)); assert_eq!(crdt.parent(b).unwrap(), Some(a)); - // move b under root - crdt.local_move_after(b, root, None).unwrap(); + crdt.local_move(b, root, LocalPlacement::First).unwrap(); assert_eq!(crdt.parent(b).unwrap(), Some(root)); assert_eq!(crdt.children(root).unwrap(), &[b, a]); } @@ -37,8 +36,7 @@ fn duplicate_operations_are_ignored() { ) .unwrap(); - let op = crdt.local_insert_after(NodeId::ROOT, NodeId(1), None).unwrap(); - // applying again should be idempotent + let (op, _) = crdt.local_insert(NodeId::ROOT, NodeId(1), LocalPlacement::First, None).unwrap(); crdt.apply_remote(op.clone()).unwrap(); crdt.apply_remote(op).unwrap(); assert_eq!(crdt.children(NodeId::ROOT).unwrap(), &[NodeId(1)]); @@ -54,14 +52,14 @@ fn delete_marks_tombstone_and_removes_from_parent() { .unwrap(); let child = NodeId(1); - crdt.local_insert_after(NodeId::ROOT, child, None).unwrap(); + crdt.local_insert(NodeId::ROOT, child, LocalPlacement::First, None).unwrap(); crdt.local_delete(child).unwrap(); assert!(crdt.is_tombstoned(child).unwrap()); assert_eq!(crdt.parent(child).unwrap(), Some(NodeId::TRASH)); assert!(crdt.children(NodeId::ROOT).unwrap().is_empty()); - crdt.local_move_after(child, NodeId::ROOT, None).unwrap(); + crdt.local_move(child, NodeId::ROOT, LocalPlacement::First).unwrap(); assert!(!crdt.is_tombstoned(child).unwrap()); assert_eq!(crdt.parent(child).unwrap(), Some(NodeId::ROOT)); } @@ -79,8 +77,8 @@ fn prevents_cycle_on_move() { let a = NodeId(1); let b = NodeId(2); - crdt.local_insert_after(root, a, None).unwrap(); - crdt.local_insert_after(a, b, None).unwrap(); + crdt.local_insert(root, a, LocalPlacement::First, None).unwrap(); + crdt.local_insert(a, b, LocalPlacement::First, None).unwrap(); crdt.apply_remote(Operation::move_node( &ReplicaId::new(b"a"), @@ -179,7 +177,7 @@ fn apply_remote_with_materialization_reports_affected_nodes() { } #[test] -fn local_move_with_plan_tracks_hint_and_payload_reindex() { +fn local_move_tracks_hint_and_payload_reindex() { let mut crdt = TreeCrdt::new( ReplicaId::new(b"a"), MemoryStorage::default(), @@ -192,12 +190,12 @@ fn local_move_with_plan_tracks_hint_and_payload_reindex() { let parent_b = NodeId(11); let node = NodeId(12); - crdt.local_insert_after(root, parent_a, None).unwrap(); - crdt.local_insert_after(root, parent_b, None).unwrap(); - crdt.local_insert_after_with_payload(parent_a, node, None, vec![1]).unwrap(); + crdt.local_insert(root, parent_a, LocalPlacement::First, None).unwrap(); + crdt.local_insert(root, parent_b, LocalPlacement::First, None).unwrap(); + crdt.local_insert(parent_a, node, LocalPlacement::First, Some(vec![1])).unwrap(); let expected_payload_writer = crdt.payload_last_writer(node).unwrap().unwrap().1; - let (_op, plan) = crdt.local_move_with_plan(node, parent_b, LocalPlacement::Last).unwrap(); + let (_op, plan) = crdt.local_move(node, parent_b, LocalPlacement::Last).unwrap(); assert_eq!(plan.parent_hints, vec![parent_b, parent_a]); assert_eq!( @@ -222,7 +220,7 @@ fn resolve_after_rejects_excluded_node() { .unwrap(); let root = NodeId::ROOT; let node = NodeId(42); - crdt.local_insert_after(root, node, None).unwrap(); + crdt.local_insert(root, node, LocalPlacement::First, None).unwrap(); let err = crdt .resolve_after_for_placement(root, LocalPlacement::After(node), Some(node)) @@ -231,7 +229,7 @@ fn resolve_after_rejects_excluded_node() { } #[test] -fn finalize_local_with_plan_advances_head_seq() { +fn finalize_local_advances_head_seq() { let mut crdt = TreeCrdt::new( ReplicaId::new(b"a"), MemoryStorage::default(), @@ -240,13 +238,9 @@ fn finalize_local_with_plan_advances_head_seq() { .unwrap(); let root = NodeId::ROOT; let node = NodeId(7); - let op = crdt.local_insert_after(root, node, None).unwrap(); - let plan = LocalFinalizePlan { - parent_hints: vec![root], - extra_index_records: Vec::new(), - }; + let (op, plan) = crdt.local_insert(root, node, LocalPlacement::First, None).unwrap(); let mut index = NoopParentOpIndex; - let next_seq = crdt.finalize_local_with_plan(&op, &mut index, 41, &plan).unwrap(); + let next_seq = crdt.finalize_local(&op, &mut index, 41, &plan).unwrap(); assert_eq!(next_seq, 42); } diff --git a/packages/treecrdt-core/tests/restarts.rs b/packages/treecrdt-core/tests/restarts.rs index 82f8b57c..f8898f5e 100644 --- a/packages/treecrdt-core/tests/restarts.rs +++ b/packages/treecrdt-core/tests/restarts.rs @@ -2,7 +2,8 @@ use std::collections::HashSet; use std::sync::{Arc, Mutex}; use treecrdt_core::{ - Lamport, LamportClock, NodeId, Operation, OperationId, ReplicaId, Result, Storage, TreeCrdt, + Lamport, LamportClock, LocalPlacement, NodeId, Operation, OperationId, ReplicaId, Result, + Storage, TreeCrdt, }; #[derive(Clone, Default)] @@ -52,8 +53,15 @@ fn local_meta_survives_restart() { let replica = ReplicaId::new(b"a"); let mut a = TreeCrdt::new(replica.clone(), storage.clone(), LamportClock::default()).unwrap(); - let op1 = a.local_insert_after(NodeId::ROOT, NodeId(1), None).unwrap(); - let op2 = a.local_insert_after(NodeId::ROOT, NodeId(2), Some(NodeId(1))).unwrap(); + let (op1, _) = a.local_insert(NodeId::ROOT, NodeId(1), LocalPlacement::First, None).unwrap(); + let (op2, _) = a + .local_insert( + NodeId::ROOT, + NodeId(2), + LocalPlacement::After(NodeId(1)), + None, + ) + .unwrap(); assert_eq!(op2.meta.id.counter, op1.meta.id.counter + 1); assert_eq!(op2.meta.lamport, op1.meta.lamport + 1); @@ -62,7 +70,14 @@ fn local_meta_survives_restart() { let mut b = TreeCrdt::new(replica.clone(), storage.clone(), LamportClock::default()).unwrap(); b.replay_from_storage().unwrap(); - let op3 = b.local_insert_after(NodeId::ROOT, NodeId(3), Some(NodeId(2))).unwrap(); + let (op3, _) = b + .local_insert( + NodeId::ROOT, + NodeId(3), + LocalPlacement::After(NodeId(2)), + None, + ) + .unwrap(); assert_eq!(op3.meta.id.counter, op2.meta.id.counter + 1); assert_eq!(op3.meta.lamport, op2.meta.lamport + 1); } diff --git a/packages/treecrdt-postgres-rs/src/local_ops.rs b/packages/treecrdt-postgres-rs/src/local_ops.rs index f69d70cf..d3c0c59e 100644 --- a/packages/treecrdt-postgres-rs/src/local_ops.rs +++ b/packages/treecrdt-postgres-rs/src/local_ops.rs @@ -86,12 +86,10 @@ fn finish_local_core_op( let mut op_index = PgParentOpIndex::new(session.ctx.clone()); // commit_local() already persisted the op and updated node/payload state. The finalize step // refreshes adapter-owned derived state that lives outside TreeCrdt itself. - match session.crdt.finalize_local_with_plan( - op, - &mut op_index, - session.meta.state().head_seq(), - &plan, - ) { + match session + .crdt + .finalize_local(op, &mut op_index, session.meta.state().head_seq(), &plan) + { Ok(v) => { seq = v; if session.nodes.flush_last_change().is_err() || op_index.flush().is_err() { @@ -144,7 +142,7 @@ pub fn local_insert( run_in_tx(client, || { let mut session = begin_local_core_op(client, doc_id, replica)?; let placement = LocalPlacement::from_parts(placement, after)?; - let (op, plan) = session.crdt.local_insert_with_plan(parent, node, placement, payload)?; + let (op, plan) = session.crdt.local_insert(parent, node, placement, payload)?; finish_local_core_op(&mut session, &op, plan)?; Ok(op) }) @@ -162,7 +160,7 @@ pub fn local_move( run_in_tx(client, || { let mut session = begin_local_core_op(client, doc_id, replica)?; let placement = LocalPlacement::from_parts(placement, after)?; - let (op, plan) = session.crdt.local_move_with_plan(node, new_parent, placement)?; + let (op, plan) = session.crdt.local_move(node, new_parent, placement)?; finish_local_core_op(&mut session, &op, plan)?; Ok(op) }) @@ -176,7 +174,7 @@ pub fn local_delete( ) -> Result { run_in_tx(client, || { let mut session = begin_local_core_op(client, doc_id, replica)?; - let (op, plan) = session.crdt.local_delete_with_plan(node)?; + let (op, plan) = session.crdt.local_delete(node)?; finish_local_core_op(&mut session, &op, plan)?; Ok(op) }) @@ -191,7 +189,7 @@ pub fn local_payload( ) -> Result { run_in_tx(client, || { let mut session = begin_local_core_op(client, doc_id, replica)?; - let (op, plan) = session.crdt.local_payload_with_plan(node, payload)?; + let (op, plan) = session.crdt.local_payload(node, payload)?; finish_local_core_op(&mut session, &op, plan)?; Ok(op) }) diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs index 54b5b0cb..532dd7ff 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs @@ -208,7 +208,7 @@ fn finish_local_core_op( let finalize_rc = match SqliteParentOpIndex::prepare(session.db, session.doc_id.clone()) { Ok(mut op_index) => session .crdt - .finalize_local_with_plan(&op, &mut op_index, head_seq, &plan) + .finalize_local(&op, &mut op_index, head_seq, &plan) .map_err(|_| SQLITE_ERROR as c_int), Err(_) => Err(SQLITE_ERROR as c_int), }; @@ -384,7 +384,7 @@ pub(super) unsafe extern "C" fn treecrdt_local_insert( } }; let out = match run_local_core_op(db, doc_id, replica, "treecrdt_local_insert", |crdt| { - crdt.local_insert_with_plan(parent_id, node_id, placement, payload.clone()) + crdt.local_insert(parent_id, node_id, placement, payload.clone()) }) { Ok(v) => v, Err(rc) => { @@ -487,7 +487,7 @@ pub(super) unsafe extern "C" fn treecrdt_local_move( } }; let out = match run_local_core_op(db, doc_id, replica, "treecrdt_local_move", |crdt| { - crdt.local_move_with_plan(node_id, new_parent_id, placement) + crdt.local_move(node_id, new_parent_id, placement) }) { Ok(v) => v, Err(rc) => { @@ -557,7 +557,7 @@ pub(super) unsafe extern "C" fn treecrdt_local_delete( let node_id = NodeId(u128::from_be_bytes(node)); let out = match run_local_core_op(db, doc_id, replica, "treecrdt_local_delete", |crdt| { - crdt.local_delete_with_plan(node_id) + crdt.local_delete(node_id) }) { Ok(v) => v, Err(rc) => { @@ -629,7 +629,7 @@ pub(super) unsafe extern "C" fn treecrdt_local_payload( let node_id = NodeId(u128::from_be_bytes(node)); let out = match run_local_core_op(db, doc_id, replica, "treecrdt_local_payload", |crdt| { - crdt.local_payload_with_plan(node_id, payload.clone()) + crdt.local_payload(node_id, payload.clone()) }) { Ok(v) => v, Err(rc) => {