diff --git a/Cargo.lock b/Cargo.lock index f18f135a..126f90a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1439,6 +1439,7 @@ dependencies = [ "serde", "serde_json", "treecrdt-core", + "treecrdt-test-support", "uuid", ] @@ -1476,6 +1477,14 @@ dependencies = [ "sqlite3ext-sys", "tempfile", "treecrdt-core", + "treecrdt-test-support", +] + +[[package]] +name = "treecrdt-test-support" +version = "0.0.1" +dependencies = [ + "treecrdt-core", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6cbf6056..c8cd6518 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "packages/treecrdt-postgres-rs", "packages/treecrdt-riblt-wasm", "packages/treecrdt-sqlite-ext", + "packages/treecrdt-test-support", "packages/treecrdt-wasm", ] resolver = "2" diff --git a/packages/treecrdt-core/src/lib.rs b/packages/treecrdt-core/src/lib.rs index 5a13aaf9..dd3f1f7c 100644 --- a/packages/treecrdt-core/src/lib.rs +++ b/packages/treecrdt-core/src/lib.rs @@ -17,13 +17,15 @@ pub use ids::{Lamport, NodeId, OperationId, ReplicaId}; pub use materialization::{ apply_incremental_ops_with_delta, apply_persisted_remote_ops_with_delta, catch_up_materialized_state, materialize_persisted_remote_ops_with_delta, - IncrementalApplyResult, MaterializationCursor, MaterializationFrontier, MaterializationHead, - MaterializationKey, MaterializationState, PersistedRemoteApplyResult, PersistedRemoteStores, + try_shortcut_out_of_order_payload_noops, CatchUpResult, IncrementalApplyResult, + MaterializationCursor, MaterializationFrontier, MaterializationHead, MaterializationKey, + MaterializationState, PayloadNoopShortcut, PersistedRemoteApplyResult, PersistedRemoteStores, }; pub use ops::{cmp_op_key, cmp_ops, Operation, OperationKind, OperationMetadata}; pub use traits::{ - Clock, IndexProvider, LamportClock, MemoryNodeStore, MemoryPayloadStore, MemoryStorage, - NodeStore, NoopParentOpIndex, NoopStorage, ParentOpIndex, PayloadStore, Storage, + Clock, ExactNodeStore, ExactPayloadStore, IndexProvider, LamportClock, MemoryNodeStore, + MemoryPayloadStore, MemoryStorage, NodeStore, NoopParentOpIndex, NoopStorage, ParentOpIndex, + PayloadStore, Storage, TruncatingParentOpIndex, }; pub use tree::{ ApplyDelta, LocalFinalizePlan, LocalPlacement, NodeExport, NodeSnapshotExport, TreeCrdt, diff --git a/packages/treecrdt-core/src/materialization.rs b/packages/treecrdt-core/src/materialization.rs index 35059826..19d1ceb7 100644 --- a/packages/treecrdt-core/src/materialization.rs +++ b/packages/treecrdt-core/src/materialization.rs @@ -1,9 +1,10 @@ use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; use crate::ops::{cmp_op_key, cmp_ops, Operation}; use crate::traits::{ - Clock, LamportClock, MemoryNodeStore, MemoryPayloadStore, NodeStore, NoopStorage, - ParentOpIndex, PayloadStore, Storage, + Clock, ExactNodeStore, ExactPayloadStore, LamportClock, MemoryNodeStore, MemoryPayloadStore, + NodeStore, NoopStorage, ParentOpIndex, PayloadStore, Storage, TruncatingParentOpIndex, }; use crate::tree::TreeCrdt; use crate::{Error, Lamport, NodeId, OperationId, ReplicaId, Result}; @@ -79,18 +80,31 @@ pub struct IncrementalApplyResult { pub affected_nodes: Vec, } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct CatchUpResult { + pub head: Option, + pub affected_nodes: Vec, +} + #[derive(Clone, Debug, Eq, PartialEq)] pub struct PersistedRemoteApplyResult { /// Number of ops from the input batch that were actually inserted by adapter-side dedupe. pub inserted_count: u64, /// Nodes changed by core materialization when incremental replay succeeded. /// - /// This is empty when nothing was inserted or when the helper had to defer catch-up by - /// recording a replay frontier instead of trusting incremental materialization. + /// This is empty when nothing was inserted or when the helper could not advance + /// materialization immediately and had to hand catch-up work back to the caller. + pub affected_nodes: Vec, + /// True when the helper recorded/kept a replay frontier and expects the caller to perform + /// catch-up. + pub catch_up_needed: bool, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct PayloadNoopShortcut { + pub resumed_head: MaterializationHead, + pub remaining_ops: Vec, pub affected_nodes: Vec, - /// True when the helper recorded a replay frontier instead of advancing materialization head - /// immediately. - pub frontier_recorded: bool, } /// Backend-owned stores used to replay already-persisted remote ops through core semantics. @@ -123,6 +137,13 @@ impl ParentOpIndex for RecordingIndex { } } +impl TruncatingParentOpIndex for RecordingIndex { + fn truncate_from(&mut self, seq: u64) -> Result<()> { + self.records.retain(|(_, _, existing_seq)| *existing_seq < seq); + Ok(()) + } +} + struct PrefixSnapshot { crdt: TreeCrdt, index: RecordingIndex, @@ -138,6 +159,14 @@ fn frontier_from_op(op: &Operation) -> MaterializationFrontier { } } +fn frontier_from_writer(lamport: Lamport, id: &OperationId) -> MaterializationFrontier { + MaterializationFrontier { + lamport, + replica: id.replica.as_bytes().to_vec(), + counter: id.counter, + } +} + fn owned_frontier>(frontier: &MaterializationKey) -> MaterializationFrontier { MaterializationFrontier { lamport: frontier.lamport, @@ -201,6 +230,122 @@ fn next_replay_frontier( } } +/// Try to skip out-of-order payload ops that are already dominated by a later payload winner. +/// +/// This allows adapters to avoid recording a replay frontier for a narrow but common case: +/// older payload ops that do not change materialized payload state even after being inserted +/// earlier in the canonical log order. +pub fn try_shortcut_out_of_order_payload_noops( + meta: &M, + inserted_ops: Vec, + mut load_last_writer: LoadWriter, +) -> std::result::Result, E> +where + M: MaterializationCursor, + LoadWriter: FnMut(NodeId) -> std::result::Result, E>, +{ + let state = meta.state(); + if state.replay_from.is_some() || inserted_ops.is_empty() { + return Ok(None); + } + + let Some(head) = state.head.as_ref() else { + return Ok(None); + }; + + let mut ops = inserted_ops; + ops.sort_by(cmp_ops); + + let mut candidate_nodes = HashSet::new(); + let mut has_out_of_order = false; + for op in &ops { + if cmp_frontiers(&frontier_from_op(op), &head.at) != Ordering::Less { + continue; + } + has_out_of_order = true; + match &op.kind { + crate::ops::OperationKind::Payload { node, .. } => { + candidate_nodes.insert(*node); + } + _ => return Ok(None), + } + } + + if !has_out_of_order { + return Ok(None); + } + + let mut final_writers: HashMap = HashMap::new(); + for node in &candidate_nodes { + if let Some((lamport, id)) = load_last_writer(*node)? { + final_writers.insert(*node, frontier_from_writer(lamport, &id)); + } + } + + for op in &ops { + let crate::ops::OperationKind::Payload { node, .. } = &op.kind else { + continue; + }; + if !candidate_nodes.contains(node) { + continue; + } + let op_frontier = frontier_from_op(op); + match final_writers.get(node) { + Some(existing) if cmp_frontiers(&op_frontier, existing) != Ordering::Greater => {} + _ => { + final_writers.insert(*node, op_frontier); + } + } + } + + let mut skipped = 0u64; + let mut affected = HashSet::new(); + let mut remaining_ops = Vec::new(); + + for op in ops { + let op_frontier = frontier_from_op(&op); + if cmp_frontiers(&op_frontier, &head.at) != Ordering::Less { + remaining_ops.push(op); + continue; + } + + let node = match &op.kind { + crate::ops::OperationKind::Payload { node, .. } => *node, + _ => return Ok(None), + }; + + let Some(final_writer) = final_writers.get(&node) else { + return Ok(None); + }; + if cmp_frontiers(&op_frontier, final_writer) != Ordering::Less { + return Ok(None); + } + + skipped = skipped.saturating_add(1); + affected.insert(node); + } + + if skipped == 0 { + return Ok(None); + } + + let mut affected_nodes: Vec = affected.into_iter().collect(); + affected_nodes.sort(); + + Ok(Some(PayloadNoopShortcut { + resumed_head: MaterializationHead { + at: MaterializationKey { + lamport: head.at.lamport, + replica: head.at.replica.to_vec(), + counter: head.at.counter, + }, + seq: head.seq.saturating_add(skipped), + }, + remaining_ops, + affected_nodes, + })) +} + /// Apply an incremental batch and return both head metadata and full affected-node delta. /// /// `affected_nodes` is deduplicated and sorted (`NodeId` ascending) for stable consumers. @@ -328,11 +473,11 @@ where Ok(result) } -fn build_prefix_snapshot( +fn replay_frontier_in_memory( storage: &S, frontier: &MaterializationFrontier, replica_id: &ReplicaId, -) -> Result { +) -> Result<(PrefixSnapshot, u64, Vec)> { let mut crdt = TreeCrdt::with_stores( replica_id.clone(), NoopStorage, @@ -343,75 +488,90 @@ fn build_prefix_snapshot( let mut index = RecordingIndex::default(); let mut seq = 0u64; let mut head: Option = None; + let mut affected = HashSet::new(); + let mut prefix_seq = None; storage.scan_since(0, &mut |op| { - if cmp_frontiers(&frontier_from_op(&op), frontier) != Ordering::Less { - return Ok(()); + let in_suffix = cmp_frontiers(&frontier_from_op(&op), frontier) != Ordering::Less; + if in_suffix && prefix_seq.is_none() { + prefix_seq = Some(seq); } match crdt.apply_remote_with_materialization_seq(op.clone(), &mut index, &mut seq)? { - Some(_) => { + Some(delta) => { head = Some(op); + if in_suffix { + affected.extend(delta.affected_nodes); + } Ok(()) } None => Err(Error::Storage( - "prefix replay unexpectedly required nested catch-up".into(), + "frontier replay unexpectedly required nested catch-up".into(), )), } })?; - Ok(PrefixSnapshot { - crdt, - index, - head, - seq, - }) + let mut affected_nodes: Vec = affected.into_iter().collect(); + affected_nodes.sort(); + Ok(( + PrefixSnapshot { + crdt, + index, + head, + seq, + }, + prefix_seq.unwrap_or(seq), + affected_nodes, + )) } -fn restore_prefix_snapshot( +fn patch_final_state_in_place( prefix: &mut PrefixSnapshot, + prefix_seq: u64, + affected_nodes: &[NodeId], nodes: &mut N, payloads: &mut P, index: &mut I, -) -> Result<()> { - let mut all_nodes = prefix.crdt.node_store_mut().all_nodes()?; - all_nodes.sort(); +) -> Result<()> +where + N: ExactNodeStore, + P: ExactPayloadStore, + I: TruncatingParentOpIndex, +{ + let truncate_from = prefix_seq.saturating_add(1); + index.truncate_from(truncate_from)?; - for node in &all_nodes { + for node in affected_nodes { nodes.ensure_node(*node)?; - } + nodes.detach(*node)?; - for node in &all_nodes { - if *node == NodeId::ROOT { - continue; - } - let parent = prefix.crdt.node_store_mut().parent(*node)?; - let order_key = prefix.crdt.node_store_mut().order_key(*node)?; - if let Some(parent) = parent { - nodes.attach(*node, parent, order_key.unwrap_or_default())?; - } else { - nodes.detach(*node)?; + if let Some(parent) = prefix.crdt.node_store_mut().parent(*node)? { + let order_key = prefix.crdt.node_store_mut().order_key(*node)?.unwrap_or_default(); + nodes.attach(*node, parent, order_key)?; } - } - for node in &all_nodes { nodes.set_tombstone(*node, prefix.crdt.node_store_mut().tombstone(*node)?)?; let last_change = prefix.crdt.node_store_mut().last_change(*node)?; - if !last_change.is_empty() { - nodes.merge_last_change(*node, &last_change)?; - } + nodes.set_last_change_exact(*node, &last_change)?; - if let Some(deleted_at) = prefix.crdt.node_store_mut().deleted_at(*node)? { - nodes.merge_deleted_at(*node, &deleted_at)?; - } + let deleted_at = prefix.crdt.node_store_mut().deleted_at(*node)?; + nodes.set_deleted_at_exact(*node, deleted_at.as_ref())?; if let Some(writer) = prefix.crdt.payload_last_writer(*node)? { payloads.set_payload(*node, prefix.crdt.payload(*node)?, writer)?; + } else { + payloads.clear_payload(*node)?; } } - let mut records = prefix.index.records.clone(); + let mut records: Vec<_> = prefix + .index + .records + .iter() + .filter(|(_, _, seq)| *seq >= truncate_from) + .cloned() + .collect(); records.sort_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)).then_with(|| a.1.cmp(&b.1))); for (parent, op_id, seq) in records { index.record(parent, &op_id, seq)?; @@ -420,21 +580,21 @@ fn restore_prefix_snapshot( Ok(()) } -/// Catch backend materialized state up to the persisted op log using the replay frontier when -/// available. +/// Catch backend materialized state up from a replay frontier by patching affected backend rows +/// and suffix index entries in place. pub fn catch_up_materialized_state( storage: S, stores: PersistedRemoteStores, meta: &M, mut flush_nodes: FlushNodes, mut flush_index: FlushIndex, -) -> Result> +) -> Result where S: Storage, C: Clock, - N: NodeStore, - P: PayloadStore, - I: ParentOpIndex, + N: ExactNodeStore, + P: ExactPayloadStore, + I: TruncatingParentOpIndex, M: MaterializationCursor, FlushNodes: FnMut(&mut N) -> Result<()>, FlushIndex: FnMut(&mut I) -> Result<()>, @@ -444,65 +604,61 @@ where state.replay_from.as_ref().map(owned_frontier) }; + let Some(frontier) = replay_frontier.as_ref() else { + return Ok(CatchUpResult { + head: meta.state().head.as_ref().map(|head| MaterializationHead { + at: MaterializationKey { + lamport: head.at.lamport, + replica: head.at.replica.to_vec(), + counter: head.at.counter, + }, + seq: head.seq, + }), + affected_nodes: Vec::new(), + }); + }; + let PersistedRemoteStores { replica_id, - clock, + clock: _clock, mut nodes, mut payloads, mut index, } = stores; - nodes.reset()?; - payloads.reset()?; - index.reset()?; - - let mut head: Option = None; - let mut seq = 0u64; - - if let Some(frontier) = replay_frontier.as_ref() { - let mut prefix = build_prefix_snapshot(&storage, frontier, &replica_id)?; - restore_prefix_snapshot(&mut prefix, &mut nodes, &mut payloads, &mut index)?; - head = prefix.head; - seq = prefix.seq; - } - - let mut crdt = TreeCrdt::with_stores(replica_id, NoopStorage, clock, nodes, payloads)?; - storage.scan_since(0, &mut |op| { - if let Some(frontier) = replay_frontier.as_ref() { - if cmp_frontiers(&frontier_from_op(&op), frontier) == Ordering::Less { - return Ok(()); - } - } - - match crdt.apply_remote_with_materialization_seq(op.clone(), &mut index, &mut seq)? { - Some(_) => { - head = Some(op); - Ok(()) - } - None => Err(Error::Storage( - "catch-up replay unexpectedly required nested catch-up".into(), - )), - } - })?; + let (mut prefix, prefix_seq, affected_nodes) = + replay_frontier_in_memory(&storage, frontier, &replica_id)?; + patch_final_state_in_place( + &mut prefix, + prefix_seq, + &affected_nodes, + &mut nodes, + &mut payloads, + &mut index, + )?; - flush_nodes(crdt.node_store_mut())?; + flush_nodes(&mut nodes)?; flush_index(&mut index)?; - Ok(head.map(|head| MaterializationHead { - at: MaterializationKey { - lamport: head.meta.lamport, - replica: head.meta.id.replica.as_bytes().to_vec(), - counter: head.meta.id.counter, - }, - seq, - })) + Ok(CatchUpResult { + head: prefix.head.map(|head| MaterializationHead { + at: MaterializationKey { + lamport: head.meta.lamport, + replica: head.meta.id.replica.as_bytes().to_vec(), + counter: head.meta.id.counter, + }, + seq: prefix.seq, + }), + affected_nodes, + }) } /// Apply already-persisted inserted remote ops and commit adapter-owned metadata writes. /// /// Adapters own persistence + dedupe and pass only the inserted subset here. If the materialized /// doc is already behind a replay frontier, or if incremental materialization / metadata updates -/// fail, this records a replay frontier so catch-up can repair materialized state later. +/// fail, this records a replay frontier and returns control to the caller. Callers can then either +/// catch up immediately in the same append flow or defer catch-up to a later read/recovery path. pub fn apply_persisted_remote_ops_with_delta( meta: &M, inserted_ops: Vec, @@ -519,7 +675,7 @@ where return Ok(PersistedRemoteApplyResult { inserted_count: 0, affected_nodes: Vec::new(), - frontier_recorded: false, + catch_up_needed: false, }); } @@ -528,7 +684,7 @@ where return Ok(PersistedRemoteApplyResult { inserted_count, affected_nodes: Vec::new(), - frontier_recorded: true, + catch_up_needed: true, }); } @@ -539,7 +695,7 @@ where return Ok(PersistedRemoteApplyResult { inserted_count, affected_nodes: Vec::new(), - frontier_recorded: true, + catch_up_needed: true, }); }; @@ -547,14 +703,14 @@ where Ok(PersistedRemoteApplyResult { inserted_count, affected_nodes: result.affected_nodes, - frontier_recorded: false, + catch_up_needed: false, }) } else { schedule_replay(&start_replay_frontier())?; Ok(PersistedRemoteApplyResult { inserted_count, affected_nodes: Vec::new(), - frontier_recorded: true, + catch_up_needed: true, }) } } @@ -563,7 +719,7 @@ where Ok(PersistedRemoteApplyResult { inserted_count, affected_nodes: Vec::new(), - frontier_recorded: true, + catch_up_needed: true, }) } } diff --git a/packages/treecrdt-core/src/traits.rs b/packages/treecrdt-core/src/traits.rs index 3b3caa61..f1fcb710 100644 --- a/packages/treecrdt-core/src/traits.rs +++ b/packages/treecrdt-core/src/traits.rs @@ -159,6 +159,10 @@ pub trait PayloadStore { ) -> Result<()>; } +pub trait ExactPayloadStore: PayloadStore { + fn clear_payload(&mut self, node: NodeId) -> Result<()>; +} + /// Persistent index of operations relevant to a `children(parent)` filter. /// /// This is used by adapters (e.g. SQLite) to support partial sync without re-implementing which @@ -168,6 +172,10 @@ pub trait ParentOpIndex { fn record(&mut self, parent: NodeId, op_id: &OperationId, seq: u64) -> Result<()>; } +pub trait TruncatingParentOpIndex: ParentOpIndex { + fn truncate_from(&mut self, seq: u64) -> Result<()>; +} + #[derive(Default)] pub struct NoopParentOpIndex; @@ -181,6 +189,17 @@ impl ParentOpIndex for NoopParentOpIndex { } } +impl TruncatingParentOpIndex for NoopParentOpIndex { + fn truncate_from(&mut self, _seq: u64) -> Result<()> { + Ok(()) + } +} + +pub trait ExactNodeStore: NodeStore { + fn set_last_change_exact(&mut self, node: NodeId, vv: &VersionVector) -> Result<()>; + fn set_deleted_at_exact(&mut self, node: NodeId, vv: Option<&VersionVector>) -> Result<()>; +} + /// Basic Lamport clock implementation useful for tests and default flows. #[derive(Clone, Debug, Default)] pub struct LamportClock { @@ -308,6 +327,13 @@ impl PayloadStore for MemoryPayloadStore { } } +impl ExactPayloadStore for MemoryPayloadStore { + fn clear_payload(&mut self, node: NodeId) -> Result<()> { + self.entries.remove(&node); + Ok(()) + } +} + #[derive(Clone, Debug)] struct MemoryNodeState { parent: Option, @@ -483,3 +509,17 @@ impl NodeStore for MemoryNodeStore { Ok(self.nodes.keys().copied().collect()) } } + +impl ExactNodeStore for MemoryNodeStore { + fn set_last_change_exact(&mut self, node: NodeId, vv: &VersionVector) -> Result<()> { + self.ensure_node(node)?; + self.get_state_mut(node)?.last_change = vv.clone(); + Ok(()) + } + + fn set_deleted_at_exact(&mut self, node: NodeId, vv: Option<&VersionVector>) -> Result<()> { + self.ensure_node(node)?; + self.get_state_mut(node)?.deleted_at = vv.cloned(); + Ok(()) + } +} diff --git a/packages/treecrdt-core/tests/materialization_helpers.rs b/packages/treecrdt-core/tests/materialization_helpers.rs index 6ce1bac6..b0f421b4 100644 --- a/packages/treecrdt-core/tests/materialization_helpers.rs +++ b/packages/treecrdt-core/tests/materialization_helpers.rs @@ -1,9 +1,12 @@ +use std::cell::Cell; +use std::rc::Rc; use treecrdt_core::{ apply_incremental_ops_with_delta, apply_persisted_remote_ops_with_delta, - materialize_persisted_remote_ops_with_delta, LamportClock, MaterializationCursor, + catch_up_materialized_state, materialize_persisted_remote_ops_with_delta, + try_shortcut_out_of_order_payload_noops, Lamport, LamportClock, MaterializationCursor, MaterializationHead, MaterializationKey, MaterializationState, MemoryNodeStore, MemoryPayloadStore, MemoryStorage, NodeId, NoopParentOpIndex, Operation, OperationId, - ParentOpIndex, PersistedRemoteStores, ReplicaId, TreeCrdt, + ParentOpIndex, PersistedRemoteStores, ReplicaId, Storage, TreeCrdt, }; #[derive(Default)] @@ -74,6 +77,38 @@ impl ParentOpIndex for RecordingIndex { } } +struct CountingStorage { + inner: MemoryStorage, + scan_count: Rc>, +} + +impl Storage for CountingStorage { + fn apply(&mut self, op: Operation) -> treecrdt_core::Result { + self.inner.apply(op) + } + + fn load_since(&self, lamport: Lamport) -> treecrdt_core::Result> { + self.inner.load_since(lamport) + } + + fn latest_lamport(&self) -> Lamport { + self.inner.latest_lamport() + } + + fn latest_counter(&self, replica: &ReplicaId) -> treecrdt_core::Result { + self.inner.latest_counter(replica) + } + + fn scan_since( + &self, + lamport: Lamport, + visit: &mut dyn FnMut(Operation) -> treecrdt_core::Result<()>, + ) -> treecrdt_core::Result<()> { + self.scan_count.set(self.scan_count.get() + 1); + self.inner.scan_since(lamport, visit) + } +} + #[test] fn finalize_local_materialization_records_unique_hints_and_extras() { let mut crdt = TreeCrdt::new( @@ -269,7 +304,7 @@ fn apply_persisted_remote_ops_materializes_only_inserted_entries() { assert_eq!(seen_counters, vec![2, 3]); assert_eq!(result.inserted_count, 2); assert_eq!(result.affected_nodes, vec![NodeId(2)]); - assert!(!result.frontier_recorded); + assert!(!result.catch_up_needed); assert_eq!( updated_head, Some(MaterializationHead { @@ -313,7 +348,7 @@ fn apply_persisted_remote_ops_schedules_replay_from_start_when_head_is_missing() assert_eq!(scheduled_replay, 1); assert_eq!(result.inserted_count, 1); assert_eq!(result.affected_nodes, Vec::::new()); - assert!(result.frontier_recorded); + assert!(result.catch_up_needed); } #[test] @@ -350,7 +385,7 @@ fn apply_persisted_remote_ops_schedules_full_replay_when_update_head_fails() { assert_eq!(scheduled_replay, 1); assert_eq!(result.inserted_count, 1); assert!(result.affected_nodes.is_empty()); - assert!(result.frontier_recorded); + assert!(result.catch_up_needed); } #[test] @@ -389,7 +424,7 @@ fn apply_persisted_remote_ops_schedules_replay_frontier_for_out_of_order_ops() { assert_eq!(materialize_runs, 0); assert_eq!(result.inserted_count, 2); assert!(result.affected_nodes.is_empty()); - assert!(result.frontier_recorded); + assert!(result.catch_up_needed); assert_eq!( replay_frontier, Some(treecrdt_core::MaterializationFrontier { @@ -429,7 +464,7 @@ fn apply_persisted_remote_ops_keeps_earliest_existing_replay_frontier() { assert_eq!(result.inserted_count, 1); assert!(result.affected_nodes.is_empty()); - assert!(result.frontier_recorded); + assert!(result.catch_up_needed); assert_eq!( replay_frontier, Some(treecrdt_core::MaterializationFrontier { @@ -486,3 +521,160 @@ fn materialize_persisted_remote_ops_with_delta_runs_prepare_and_flush_hooks() { vec![NodeId::ROOT, NodeId(10), NodeId(11)] ); } + +#[test] +fn payload_noop_shortcut_skips_out_of_order_payload_dominated_by_current_winner() { + let cursor = Cursor { + head_lamport: 10, + head_replica: b"r".to_vec(), + head_counter: 10, + head_seq: 5, + ..Cursor::default() + }; + let replica = ReplicaId::new(b"r"); + let node = NodeId(7); + let op = Operation::set_payload(&replica, 4, 4, node, vec![1]); + + let shortcut = try_shortcut_out_of_order_payload_noops(&cursor, vec![op.clone()], |lookup| { + assert_eq!(lookup, node); + Ok::<_, ()>(Some(( + 9, + OperationId { + replica: replica.clone(), + counter: 9, + }, + ))) + }) + .unwrap() + .expect("expected payload noop shortcut"); + + assert_eq!(shortcut.resumed_head.at.counter, 10); + assert_eq!(shortcut.resumed_head.seq, 6); + assert!(shortcut.remaining_ops.is_empty()); + assert_eq!(shortcut.affected_nodes, vec![node]); +} + +#[test] +fn payload_noop_shortcut_keeps_later_in_order_payload_for_incremental_materialization() { + let cursor = Cursor { + head_lamport: 10, + head_replica: b"r".to_vec(), + head_counter: 10, + head_seq: 5, + ..Cursor::default() + }; + let replica = ReplicaId::new(b"r"); + let node = NodeId(8); + let older = Operation::set_payload(&replica, 4, 4, node, vec![1]); + let newer = Operation::set_payload(&replica, 12, 12, node, vec![2]); + + let shortcut = try_shortcut_out_of_order_payload_noops( + &cursor, + vec![newer.clone(), older.clone()], + |_| Ok::<_, ()>(None), + ) + .unwrap() + .expect("expected payload noop shortcut"); + + assert_eq!(shortcut.resumed_head.seq, 6); + assert_eq!(shortcut.remaining_ops, vec![newer]); + assert_eq!(shortcut.affected_nodes, vec![node]); +} + +#[test] +fn payload_noop_shortcut_rejects_out_of_order_payload_that_becomes_final_winner() { + let cursor = Cursor { + head_lamport: 10, + head_replica: b"r".to_vec(), + head_counter: 10, + head_seq: 5, + ..Cursor::default() + }; + let replica = ReplicaId::new(b"r"); + let node = NodeId(9); + let op = Operation::set_payload(&replica, 4, 4, node, vec![1]); + + let shortcut = try_shortcut_out_of_order_payload_noops(&cursor, vec![op], |_| { + Ok::<_, ()>(Some(( + 2, + OperationId { + replica: ReplicaId::new(b"old"), + counter: 2, + }, + ))) + }) + .unwrap(); + + assert!(shortcut.is_none()); +} + +#[test] +fn payload_noop_shortcut_rejects_out_of_order_move() { + let cursor = Cursor { + head_lamport: 10, + head_replica: b"r".to_vec(), + head_counter: 10, + head_seq: 5, + ..Cursor::default() + }; + let replica = ReplicaId::new(b"r"); + let move_op = Operation::move_node(&replica, 4, 4, NodeId(3), NodeId::ROOT, vec![0x10]); + let called = Cell::new(false); + + let shortcut = try_shortcut_out_of_order_payload_noops( + &cursor, + vec![move_op], + |_| -> Result, ()> { + called.set(true); + Ok(None) + }, + ) + .unwrap(); + + assert!(shortcut.is_none()); + assert!(!called.get()); +} + +#[test] +fn catch_up_materialized_state_scans_storage_once() { + let replica = ReplicaId::new(b"scan-once"); + let first = Operation::insert(&replica, 1, 1, NodeId::ROOT, NodeId(1), vec![0x10]); + let second = Operation::insert(&replica, 2, 2, NodeId::ROOT, NodeId(2), vec![0x20]); + + let mut inner = MemoryStorage::default(); + inner.apply(first.clone()).unwrap(); + inner.apply(second.clone()).unwrap(); + let scan_count = Rc::new(Cell::new(0)); + let storage = CountingStorage { + inner, + scan_count: scan_count.clone(), + }; + let meta = Cursor { + replay_lamport: Some(first.meta.lamport), + replay_replica: Some(first.meta.id.replica.as_bytes().to_vec()), + replay_counter: Some(first.meta.id.counter), + ..Cursor::default() + }; + + let result = catch_up_materialized_state( + storage, + PersistedRemoteStores { + replica_id: ReplicaId::new(b"adapter"), + clock: LamportClock::default(), + nodes: MemoryNodeStore::default(), + payloads: MemoryPayloadStore::default(), + index: NoopParentOpIndex, + }, + &meta, + |_| Ok(()), + |_| Ok(()), + ) + .unwrap(); + + assert_eq!(scan_count.get(), 1); + assert_eq!(result.head.expect("expected head").seq, 2); + assert_eq!( + result.affected_nodes, + vec![NodeId::ROOT, NodeId(1), NodeId(2)] + ); +} diff --git a/packages/treecrdt-postgres-rs/Cargo.toml b/packages/treecrdt-postgres-rs/Cargo.toml index 2c05a76a..6f8239e2 100644 --- a/packages/treecrdt-postgres-rs/Cargo.toml +++ b/packages/treecrdt-postgres-rs/Cargo.toml @@ -13,5 +13,5 @@ serde_json = "1.0" treecrdt-core = { path = "../treecrdt-core", features = ["serde"] } [dev-dependencies] +treecrdt-test-support = { path = "../treecrdt-test-support" } uuid = { version = "1.8", features = ["v4"] } - diff --git a/packages/treecrdt-postgres-rs/src/profile.rs b/packages/treecrdt-postgres-rs/src/profile.rs index 1f06412f..b4ba7006 100644 --- a/packages/treecrdt-postgres-rs/src/profile.rs +++ b/packages/treecrdt-postgres-rs/src/profile.rs @@ -22,7 +22,7 @@ pub(crate) struct PgAppendProfile { pub(crate) dedupe_filter_ms: f64, pub(crate) materialize_ms: f64, pub(crate) update_head_ms: f64, - pub(crate) frontier_recorded: bool, + pub(crate) catch_up_needed: bool, pub(crate) node_load_count: u64, pub(crate) node_load_ms: f64, pub(crate) node_ensure_count: u64, @@ -74,7 +74,7 @@ impl PgAppendProfile { "dedupeFilterMs": self.dedupe_filter_ms, "materializeMs": self.materialize_ms, "updateHeadMs": self.update_head_ms, - "frontierRecorded": self.frontier_recorded, + "catchUpNeeded": self.catch_up_needed, "nodeLoadCount": self.node_load_count, "nodeLoadMs": self.node_load_ms, "nodeEnsureCount": self.node_ensure_count, diff --git a/packages/treecrdt-postgres-rs/src/store.rs b/packages/treecrdt-postgres-rs/src/store.rs index b5d830a3..ecd3debc 100644 --- a/packages/treecrdt-postgres-rs/src/store.rs +++ b/packages/treecrdt-postgres-rs/src/store.rs @@ -7,10 +7,11 @@ use postgres::{Client, Row, Statement}; use treecrdt_core::{ apply_persisted_remote_ops_with_delta, catch_up_materialized_state, - materialize_persisted_remote_ops_with_delta, Error, Lamport, LamportClock, - MaterializationCursor, MaterializationFrontier, MaterializationHead, MaterializationKey, - MaterializationState, NodeId, Operation, OperationId, OperationKind, PersistedRemoteStores, - ReplicaId, Result, Storage, VersionVector, + materialize_persisted_remote_ops_with_delta, try_shortcut_out_of_order_payload_noops, Error, + ExactNodeStore, ExactPayloadStore, Lamport, LamportClock, MaterializationCursor, + MaterializationFrontier, MaterializationHead, MaterializationKey, MaterializationState, NodeId, + NodeStore, Operation, OperationId, OperationKind, PayloadStore, PersistedRemoteStores, + ReplicaId, Result, Storage, TruncatingParentOpIndex, VersionVector, }; use crate::opref::{derive_op_ref_v0, OPREF_V0_WIDTH}; @@ -841,6 +842,56 @@ impl treecrdt_core::NodeStore for PgNodeStore { } } +impl ExactNodeStore for PgNodeStore { + fn set_last_change_exact(&mut self, node: NodeId, vv: &VersionVector) -> Result<()> { + self.ensure_node(node)?; + let node_bytes = node_to_bytes(node); + let bytes = if vv.is_empty() { + None + } else { + Some(vv_to_bytes(vv)?) + }; + let mut c = self.ctx.client.borrow_mut(); + let stmt = self.ctx.stmt( + &mut c, + "UPDATE treecrdt_nodes \ + SET last_change = $3 \ + WHERE doc_id = $1 AND node = $2", + )?; + c.execute(&stmt, &[&self.ctx.doc_id, &node_bytes.as_slice(), &bytes]) + .map_err(storage_debug)?; + + if let Some(Some(row)) = self.cache.borrow_mut().get_mut(&node) { + row.last_change = bytes; + } + self.pending_last_change.borrow_mut().remove(&node); + Ok(()) + } + + fn set_deleted_at_exact(&mut self, node: NodeId, vv: Option<&VersionVector>) -> Result<()> { + self.ensure_node(node)?; + let node_bytes = node_to_bytes(node); + let bytes = match vv { + Some(vv) if !vv.is_empty() => Some(vv_to_bytes(vv)?), + _ => None, + }; + let mut c = self.ctx.client.borrow_mut(); + let stmt = self.ctx.stmt( + &mut c, + "UPDATE treecrdt_nodes \ + SET deleted_at = $3 \ + WHERE doc_id = $1 AND node = $2", + )?; + c.execute(&stmt, &[&self.ctx.doc_id, &node_bytes.as_slice(), &bytes]) + .map_err(storage_debug)?; + + if let Some(Some(row)) = self.cache.borrow_mut().get_mut(&node) { + row.deleted_at = bytes; + } + Ok(()) + } +} + pub(crate) struct PgPayloadStore { ctx: PgCtx, cache: RefCell>>, @@ -976,6 +1027,21 @@ impl treecrdt_core::PayloadStore for PgPayloadStore { } } +impl ExactPayloadStore for PgPayloadStore { + fn clear_payload(&mut self, node: NodeId) -> Result<()> { + let node_bytes = node_to_bytes(node); + let mut c = self.ctx.client.borrow_mut(); + let stmt = self.ctx.stmt( + &mut c, + "DELETE FROM treecrdt_payload WHERE doc_id = $1 AND node = $2", + )?; + c.execute(&stmt, &[&self.ctx.doc_id, &node_bytes.as_slice()]) + .map_err(storage_debug)?; + self.cache.borrow_mut().insert(node, None); + Ok(()) + } +} + pub(crate) struct PgParentOpIndex { ctx: PgCtx, pending: Vec, @@ -1059,6 +1125,19 @@ impl treecrdt_core::ParentOpIndex for PgParentOpIndex { } } +impl TruncatingParentOpIndex for PgParentOpIndex { + fn truncate_from(&mut self, seq: u64) -> Result<()> { + self.pending.clear(); + let mut c = self.ctx.client.borrow_mut(); + let stmt = self.ctx.stmt( + &mut c, + "DELETE FROM treecrdt_oprefs_children WHERE doc_id = $1 AND seq >= $2", + )?; + c.execute(&stmt, &[&self.ctx.doc_id, &(seq as i64)]).map_err(storage_debug)?; + Ok(()) + } +} + const PARENT_OP_INDEX_FLUSH_SIZE: usize = 4096; struct PendingParentOpRefRow { @@ -1539,6 +1618,13 @@ fn materialize_inserted_ops( ) } +fn merge_affected_nodes(mut left: Vec, right: Vec) -> Vec { + left.extend(right); + left.sort(); + left.dedup(); + left +} + pub fn append_ops(client: &Rc>, doc_id: &str, ops: &[Operation]) -> Result { { let mut c = client.borrow_mut(); @@ -1634,18 +1720,83 @@ fn append_ops_in_tx( let materialize_started_at = Instant::now(); let mut update_head_ms = 0.0; - let apply_result = apply_persisted_remote_ops_with_delta( - &meta, - inserted_ops, - |inserted| materialize_inserted_ops(ctx.clone(), &meta, inserted), - |head| { - let started_at = Instant::now(); - let result = update_tree_meta_head(&ctx.client, &ctx.doc_id, Some(head)); - update_head_ms += started_at.elapsed().as_secs_f64() * 1000.0; - result - }, - |frontier| set_tree_meta_replay_frontier(client, doc_id, frontier), - )?; + let mut update_head = |head: &MaterializationHead| { + let started_at = Instant::now(); + let result = update_tree_meta_head(&ctx.client, &ctx.doc_id, Some(head)); + update_head_ms += started_at.elapsed().as_secs_f64() * 1000.0; + result + }; + let apply_result = if let Some(shortcut) = { + let payloads = PgPayloadStore::new(ctx.clone()); + try_shortcut_out_of_order_payload_noops(&meta, inserted_ops.clone(), |node| { + payloads.last_writer(node) + })? + } { + if shortcut.remaining_ops.is_empty() { + update_head(&shortcut.resumed_head)?; + treecrdt_core::PersistedRemoteApplyResult { + inserted_count: inserted_ops.len().min(u64::MAX as usize) as u64, + affected_nodes: shortcut.affected_nodes, + catch_up_needed: false, + } + } else { + let shortcut_meta = TreeMeta(MaterializationState { + head: Some(shortcut.resumed_head.clone()), + replay_from: None, + }); + let result = + materialize_inserted_ops(ctx.clone(), &shortcut_meta, shortcut.remaining_ops)?; + let head = result.head.ok_or_else(|| { + Error::Storage("expected head after payload noop shortcut".into()) + })?; + update_head(&head)?; + treecrdt_core::PersistedRemoteApplyResult { + inserted_count: inserted_ops.len().min(u64::MAX as usize) as u64, + affected_nodes: merge_affected_nodes( + shortcut.affected_nodes, + result.affected_nodes, + ), + catch_up_needed: false, + } + } + } else { + apply_persisted_remote_ops_with_delta( + &meta, + inserted_ops, + |inserted| materialize_inserted_ops(ctx.clone(), &meta, inserted), + &mut update_head, + |frontier| set_tree_meta_replay_frontier(client, doc_id, frontier), + )? + }; + let apply_result = if apply_result.catch_up_needed { + let refreshed_meta = load_tree_meta_for_update(client, doc_id)?; + let catch_up = catch_up_materialized_state( + PgOpStorage::new(ctx.clone()), + PersistedRemoteStores { + replica_id: ReplicaId::new(b"postgres"), + clock: LamportClock::default(), + nodes: PgNodeStore::new(ctx.clone()), + payloads: PgPayloadStore::new(ctx.clone()), + index: PgParentOpIndex::new(ctx.clone()), + }, + &refreshed_meta, + |nodes| nodes.flush_last_change(), + |index| index.flush(), + )?; + update_head( + catch_up + .head + .as_ref() + .ok_or_else(|| Error::Storage("expected head after immediate catch-up".into()))?, + )?; + treecrdt_core::PersistedRemoteApplyResult { + inserted_count: apply_result.inserted_count, + affected_nodes: catch_up.affected_nodes, + catch_up_needed: false, + } + } else { + apply_result + }; if let Some(profile) = &append_profile { profile.borrow_mut().materialize_ms += materialize_started_at.elapsed().as_secs_f64() * 1000.0; @@ -1653,8 +1804,8 @@ fn append_ops_in_tx( if let Some(profile) = &append_profile { profile.borrow_mut().update_head_ms += update_head_ms; - if apply_result.frontier_recorded { - profile.borrow_mut().frontier_recorded = true; + if apply_result.catch_up_needed { + profile.borrow_mut().catch_up_needed = true; } profile.borrow().log(doc_id, apply_result.inserted_count as usize); } @@ -1701,7 +1852,7 @@ pub(crate) fn ensure_materialized_in_tx(client: &Rc>, doc_id: &s let ctx = PgCtx::new(client.clone(), doc_id)?; let storage = PgOpStorage::new(ctx.clone()); - let head = catch_up_materialized_state( + let catch_up = catch_up_materialized_state( storage, PersistedRemoteStores { replica_id: ReplicaId::new(b"postgres"), @@ -1715,7 +1866,7 @@ pub(crate) fn ensure_materialized_in_tx(client: &Rc>, doc_id: &s |index| index.flush(), )?; - update_tree_meta_head(client, doc_id, head.as_ref())?; + update_tree_meta_head(client, doc_id, catch_up.head.as_ref())?; Ok(()) } diff --git a/packages/treecrdt-postgres-rs/tests/postgres_test.rs b/packages/treecrdt-postgres-rs/tests/postgres_test.rs index bc3990e2..fb1ea431 100644 --- a/packages/treecrdt-postgres-rs/tests/postgres_test.rs +++ b/packages/treecrdt-postgres-rs/tests/postgres_test.rs @@ -12,34 +12,10 @@ use treecrdt_postgres::{ local_move, local_payload, max_lamport, replica_max_counter, reset_doc_for_tests, tree_children, tree_payload, }; - -fn order_key_from_position(position: u16) -> Vec { - let n = position.wrapping_add(1); - n.to_be_bytes().to_vec() -} - -fn node(n: u128) -> NodeId { - NodeId(n) -} - -fn representative_remote_batch(replica: &ReplicaId) -> (NodeId, NodeId, NodeId, Vec) { - let p1 = node(1); - let p2 = node(2); - let child = node(3); - ( - p1, - p2, - child, - vec![ - Operation::insert(replica, 1, 1, NodeId::ROOT, p1, order_key_from_position(0)), - Operation::insert(replica, 2, 2, NodeId::ROOT, p2, order_key_from_position(1)), - Operation::insert(replica, 3, 3, p1, child, order_key_from_position(0)), - Operation::set_payload(replica, 4, 4, child, vec![7]), - Operation::move_node(replica, 5, 5, child, p2, order_key_from_position(0)), - Operation::set_payload(replica, 6, 6, child, vec![8]), - ], - ) -} +use treecrdt_test_support::{ + self as materialization_conformance, node, order_key_from_position, + representative_remote_batch, MaterializationConformanceHarness, +}; fn connect() -> Option>> { let url = std::env::var("TREECRDT_POSTGRES_URL").ok()?; @@ -55,6 +31,97 @@ fn ensure_schema_once(client: &Rc>) { }); } +struct PgConformanceHarness { + client: Rc>, + doc_id: String, +} + +impl MaterializationConformanceHarness for PgConformanceHarness { + fn append_ops(&self, ops: &[Operation]) { + append_ops(&self.client, &self.doc_id, ops).unwrap(); + } + + fn append_ops_with_affected_nodes(&self, ops: &[Operation]) -> Vec { + append_ops_with_affected_nodes(&self.client, &self.doc_id, ops).unwrap() + } + + fn visible_children(&self, parent: NodeId) -> Vec { + tree_children(&self.client, &self.doc_id, parent).unwrap() + } + + fn payload(&self, node: NodeId) -> Option> { + tree_payload(&self.client, &self.doc_id, node).unwrap() + } + + fn replay_frontier(&self) -> Option { + let mut c = self.client.borrow_mut(); + let row = c + .query_one( + "SELECT replay_lamport, replay_replica, replay_counter \ + FROM treecrdt_meta WHERE doc_id = $1", + &[&self.doc_id], + ) + .unwrap(); + match ( + row.get::<_, Option>(0).map(|v| v.max(0) as u64), + row.get::<_, Option>>(1), + row.get::<_, Option>(2).map(|v| v.max(0) as u64), + ) { + (Some(lamport), Some(replica), Some(counter)) => { + Some(treecrdt_core::MaterializationFrontier { + lamport, + replica, + counter, + }) + } + _ => None, + } + } + + fn head_seq(&self) -> u64 { + let mut c = self.client.borrow_mut(); + let row = c + .query_one( + "SELECT head_seq FROM treecrdt_meta WHERE doc_id = $1", + &[&self.doc_id], + ) + .unwrap(); + row.get::<_, i64>(0).max(0) as u64 + } + + fn force_replay_from_start(&self) { + let mut c = self.client.borrow_mut(); + c.execute( + "UPDATE treecrdt_meta \ + SET replay_lamport = 0, replay_replica = ''::bytea, replay_counter = 0 \ + WHERE doc_id = $1", + &[&self.doc_id], + ) + .unwrap(); + } + + fn ensure_materialized(&self) { + ensure_materialized(&self.client, &self.doc_id).unwrap(); + } + + fn op_ref_counters_for_parent(&self, parent: NodeId) -> Vec { + let refs = list_op_refs_children(&self.client, &self.doc_id, parent).unwrap(); + let ops = get_ops_by_op_refs(&self.client, &self.doc_id, &refs).unwrap(); + ops.iter().map(|op| op.meta.id.counter).collect() + } +} + +fn setup_conformance_harness() -> Option { + let client = connect()?; + ensure_schema_once(&client); + let doc_id = format!("test-{}", Uuid::new_v4()); + { + let mut c = client.borrow_mut(); + reset_doc_for_tests(&mut c, &doc_id).unwrap(); + } + Some(PgConformanceHarness { client, doc_id }) +} + #[test] fn postgres_backend_apply_is_idempotent_and_max_lamport_monotonic() { let Some(client) = connect() else { @@ -165,93 +232,61 @@ fn postgres_backend_append_with_affected_nodes_matches_representative_remote_bat } #[test] -fn postgres_backend_out_of_order_append_uses_replay_frontier() { - let Some(client) = connect() else { +fn postgres_backend_out_of_order_append_catches_up_immediately_from_frontier() { + let Some(harness) = setup_conformance_harness() else { return; }; - ensure_schema_once(&client); - - let doc_id = format!("test-{}", Uuid::new_v4()); - { - let mut c = client.borrow_mut(); - reset_doc_for_tests(&mut c, &doc_id).unwrap(); - } - - let replica = ReplicaId::new(b"ooo"); - let second = Operation::insert( - &replica, - 2, - 2, - NodeId::ROOT, - node(2), - order_key_from_position(1), - ); - let first = Operation::insert( - &replica, - 1, - 1, - NodeId::ROOT, - node(1), - order_key_from_position(0), - ); + materialization_conformance::out_of_order_append_catches_up_immediately_from_frontier(&harness); +} - append_ops(&client, &doc_id, &[second]).unwrap(); - let affected = - append_ops_with_affected_nodes(&client, &doc_id, std::slice::from_ref(&first)).unwrap(); - assert!(affected.is_empty()); +#[test] +fn postgres_backend_out_of_order_losing_payload_skips_replay_frontier() { + let Some(harness) = setup_conformance_harness() else { + return; + }; + materialization_conformance::out_of_order_losing_payload_skips_replay_frontier(&harness); +} - let (replay_lamport, replay_replica, replay_counter, head_seq_before) = { - let mut c = client.borrow_mut(); - let row = c - .query_one( - "SELECT replay_lamport, replay_replica, replay_counter, head_seq \ - FROM treecrdt_meta WHERE doc_id = $1", - &[&doc_id], - ) - .unwrap(); - ( - row.get::<_, Option>(0).map(|v| v.max(0) as u64), - row.get::<_, Option>>(1), - row.get::<_, Option>(2).map(|v| v.max(0) as u64), - row.get::<_, i64>(3).max(0) as u64, - ) +#[test] +fn postgres_backend_out_of_order_move_with_later_payload_catches_up_immediately() { + let Some(harness) = setup_conformance_harness() else { + return; }; - assert_eq!(replay_lamport, Some(first.meta.lamport)); - assert_eq!( - replay_replica, - Some(first.meta.id.replica.as_bytes().to_vec()) + materialization_conformance::out_of_order_move_with_later_payload_catches_up_immediately( + &harness, ); - assert_eq!(replay_counter, Some(first.meta.id.counter)); - assert_eq!(head_seq_before, 1); +} - assert_eq!( - tree_children(&client, &doc_id, NodeId::ROOT).unwrap(), - vec![node(1), node(2)] +#[test] +fn postgres_backend_out_of_order_insert_and_move_before_head_catches_up_immediately() { + let Some(harness) = setup_conformance_harness() else { + return; + }; + materialization_conformance::out_of_order_insert_and_move_before_head_catches_up_immediately( + &harness, ); +} - let replay_after_read = { - let mut c = client.borrow_mut(); - let row = c - .query_one( - "SELECT replay_lamport, head_seq FROM treecrdt_meta WHERE doc_id = $1", - &[&doc_id], - ) - .unwrap(); - assert_eq!(row.get::<_, i64>(1).max(0) as u64, 2); - row.get::<_, Option>(0) +#[test] +fn postgres_backend_replay_from_start_frontier_catches_up_immediately() { + let Some(harness) = setup_conformance_harness() else { + return; }; - assert_eq!(replay_after_read, None); + materialization_conformance::replay_from_start_frontier_catches_up_immediately(&harness); +} - let refs = list_op_refs_children(&client, &doc_id, NodeId::ROOT).unwrap(); - let ops = get_ops_by_op_refs(&client, &doc_id, &refs).unwrap(); - assert_eq!( - ops.iter().map(|op| op.meta.id.counter).collect::>(), - vec![1, 2] +#[test] +fn postgres_backend_deferred_recovery_from_replay_frontier_catches_up_on_ensure() { + let Some(harness) = setup_conformance_harness() else { + return; + }; + materialization_conformance::deferred_recovery_from_replay_frontier_catches_up_on_ensure( + &harness, ); } #[test] -fn postgres_backend_replay_from_start_frontier_recovers_materialized_state() { +fn postgres_backend_failed_immediate_catch_up_rolls_back_inserted_ops_and_meta() { let Some(client) = connect() else { return; }; @@ -263,15 +298,7 @@ fn postgres_backend_replay_from_start_frontier_recovers_materialized_state() { reset_doc_for_tests(&mut c, &doc_id).unwrap(); } - let replica = ReplicaId::new(b"restart"); - let first = Operation::insert( - &replica, - 1, - 1, - NodeId::ROOT, - node(1), - order_key_from_position(0), - ); + let replica = ReplicaId::new(b"rollback"); let second = Operation::insert( &replica, 2, @@ -280,58 +307,92 @@ fn postgres_backend_replay_from_start_frontier_recovers_materialized_state() { node(2), order_key_from_position(1), ); + let first = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + node(1), + order_key_from_position(0), + ); - append_ops(&client, &doc_id, &[first]).unwrap(); + append_ops(&client, &doc_id, &[second]).unwrap(); + + let trigger_name = format!("fail_treecrdt_nodes_trigger_{}", Uuid::new_v4().simple()); + let function_name = format!("fail_treecrdt_nodes_fn_{}", Uuid::new_v4().simple()); { let mut c = client.borrow_mut(); - c.execute( - "UPDATE treecrdt_meta \ - SET replay_lamport = 0, replay_replica = ''::bytea, replay_counter = 0 \ - WHERE doc_id = $1", - &[&doc_id], - ) + c.batch_execute(&format!( + "CREATE FUNCTION {function_name}() RETURNS trigger LANGUAGE plpgsql AS $$ \ + BEGIN \ + RAISE EXCEPTION 'forced catch-up failure'; \ + END; \ + $$; \ + CREATE TRIGGER {trigger_name} \ + BEFORE INSERT OR UPDATE ON treecrdt_nodes \ + FOR EACH ROW \ + WHEN (NEW.doc_id = '{doc_id}') \ + EXECUTE FUNCTION {function_name}();" + )) .unwrap(); } - let affected = append_ops_with_affected_nodes(&client, &doc_id, &[second]).unwrap(); - assert!(affected.is_empty()); + let append_err = + append_ops_with_affected_nodes(&client, &doc_id, std::slice::from_ref(&first)).unwrap_err(); + assert!(append_err.to_string().contains("forced catch-up failure")); - let (replay_lamport, replay_replica, replay_counter) = { + let (op_count, replay_lamport, replay_replica, replay_counter, head_seq, children) = { + let root_bytes = NodeId::ROOT.0.to_be_bytes(); let mut c = client.borrow_mut(); - let row = c + let meta_row = c .query_one( - "SELECT replay_lamport, replay_replica, replay_counter \ + "SELECT \ + (SELECT COUNT(*) FROM treecrdt_ops WHERE doc_id = $1), \ + replay_lamport, replay_replica, replay_counter, head_seq \ FROM treecrdt_meta WHERE doc_id = $1", &[&doc_id], ) .unwrap(); + let child_rows = c + .query( + "SELECT node FROM treecrdt_nodes \ + WHERE doc_id = $1 AND parent = $2 AND tombstone = FALSE \ + ORDER BY order_key, node", + &[&doc_id, &root_bytes.as_slice()], + ) + .unwrap(); + let children = child_rows + .iter() + .map(|row| { + let bytes: Vec = row.get(0); + NodeId(u128::from_be_bytes(bytes.try_into().unwrap())) + }) + .collect::>(); ( - row.get::<_, Option>(0).map(|v| v.max(0) as u64), - row.get::<_, Option>>(1), - row.get::<_, Option>(2).map(|v| v.max(0) as u64), + meta_row.get::<_, i64>(0).max(0) as u64, + meta_row.get::<_, Option>(1).map(|v| v.max(0) as u64), + meta_row.get::<_, Option>>(2), + meta_row.get::<_, Option>(3).map(|v| v.max(0) as u64), + meta_row.get::<_, i64>(4).max(0) as u64, + children, ) }; - assert_eq!(replay_lamport, Some(0)); - assert_eq!(replay_replica, Some(Vec::new())); - assert_eq!(replay_counter, Some(0)); - assert_eq!( - tree_children(&client, &doc_id, NodeId::ROOT).unwrap(), - vec![node(1), node(2)] - ); + assert_eq!(op_count, 1); + assert_eq!(replay_lamport, None); + assert_eq!(replay_replica, None); + assert_eq!(replay_counter, None); + assert_eq!(head_seq, 1); + assert_eq!(children, vec![node(2)]); - let replay_after_read = { + { let mut c = client.borrow_mut(); - let row = c - .query_one( - "SELECT replay_lamport, head_seq FROM treecrdt_meta WHERE doc_id = $1", - &[&doc_id], - ) - .unwrap(); - assert_eq!(row.get::<_, i64>(1).max(0) as u64, 2); - row.get::<_, Option>(0) - }; - assert_eq!(replay_after_read, None); + c.batch_execute(&format!( + "DROP TRIGGER IF EXISTS {trigger_name} ON treecrdt_nodes; \ + DROP FUNCTION IF EXISTS {function_name}();" + )) + .unwrap(); + } } #[test] diff --git a/packages/treecrdt-sqlite-ext/Cargo.toml b/packages/treecrdt-sqlite-ext/Cargo.toml index 997831f2..a7cd112f 100644 --- a/packages/treecrdt-sqlite-ext/Cargo.toml +++ b/packages/treecrdt-sqlite-ext/Cargo.toml @@ -27,3 +27,4 @@ wasm-ext = ["static-link"] [dev-dependencies] tempfile = "3.10" +treecrdt-test-support = { path = "../treecrdt-test-support" } diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs index c4879674..062638da 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs @@ -2,11 +2,19 @@ use super::append::JsonAppendOp; use super::node_store::SqliteNodeStore; use super::op_index::SqliteParentOpIndex; use super::payload_store::SqlitePayloadStore; -use super::schema::set_tree_meta_replay_frontier; +use super::schema::{set_tree_meta_replay_frontier, tree_meta_from_state}; use super::util::sqlite_err_from_core; use super::*; -use treecrdt_core::MaterializationCursor; +use treecrdt_core::PayloadStore; use treecrdt_core::Storage; +use treecrdt_core::{LamportClock, MaterializationCursor, ReplicaId}; + +fn merge_affected_nodes(mut left: Vec, right: Vec) -> Vec { + left.extend(right); + left.sort(); + left.dedup(); + left +} fn parse_node_id(bytes: &[u8]) -> Result { if bytes.len() != 16 { @@ -183,7 +191,7 @@ fn catch_up_materialized_from_frontier(db: *mut sqlite3) -> Result<(), c_int> { return Err(SQLITE_ERROR as c_int); } }; - let head = match catch_up_materialized_state( + let catch_up = match catch_up_materialized_state( storage, treecrdt_core::PersistedRemoteStores { replica_id: ReplicaId::new(b"sqlite-ext"), @@ -203,7 +211,7 @@ fn catch_up_materialized_from_frontier(db: *mut sqlite3) -> Result<(), c_int> { } }; - let head_rc = update_tree_meta_head(db, head.as_ref()); + let head_rc = update_tree_meta_head(db, catch_up.head.as_ref()); if head_rc.is_err() { sqlite_exec(db, rollback.as_ptr(), None, null_mut(), null_mut()); return head_rc; @@ -264,13 +272,77 @@ pub(super) fn append_ops_impl( } } - let apply_result = treecrdt_core::apply_persisted_remote_ops_with_delta( - &meta, - inserted_ops, - |inserted| materialize_inserted_ops(db, doc_id, &meta, &inserted), - |head| update_tree_meta_head(db, Some(head)), - |frontier| set_tree_meta_replay_frontier(db, frontier), - )?; + let apply_result = if let Some(shortcut) = { + let payloads = SqlitePayloadStore::prepare(db).map_err(|_| SQLITE_ERROR as c_int)?; + treecrdt_core::try_shortcut_out_of_order_payload_noops( + &meta, + inserted_ops.clone(), + |node| payloads.last_writer(node).map_err(sqlite_err_from_core), + )? + } { + if shortcut.remaining_ops.is_empty() { + update_tree_meta_head(db, Some(&shortcut.resumed_head))?; + treecrdt_core::PersistedRemoteApplyResult { + inserted_count: inserted_ops.len().min(u64::MAX as usize) as u64, + affected_nodes: shortcut.affected_nodes, + catch_up_needed: false, + } + } else { + let shortcut_meta = tree_meta_from_state(treecrdt_core::MaterializationState { + head: Some(shortcut.resumed_head.clone()), + replay_from: None, + }); + let result = + materialize_inserted_ops(db, doc_id, &shortcut_meta, &shortcut.remaining_ops)?; + let head = result.head.ok_or(SQLITE_ERROR as c_int)?; + update_tree_meta_head(db, Some(&head))?; + treecrdt_core::PersistedRemoteApplyResult { + inserted_count: inserted_ops.len().min(u64::MAX as usize) as u64, + affected_nodes: merge_affected_nodes( + shortcut.affected_nodes, + result.affected_nodes, + ), + catch_up_needed: false, + } + } + } else { + treecrdt_core::apply_persisted_remote_ops_with_delta( + &meta, + inserted_ops, + |inserted| materialize_inserted_ops(db, doc_id, &meta, &inserted), + |head| update_tree_meta_head(db, Some(head)), + |frontier| set_tree_meta_replay_frontier(db, frontier), + )? + }; + let apply_result = if apply_result.catch_up_needed { + let refreshed_meta = load_tree_meta(db)?; + let catch_up = treecrdt_core::catch_up_materialized_state( + super::op_storage::SqliteOpStorage::with_doc_id(db, doc_id.to_vec()), + treecrdt_core::PersistedRemoteStores { + replica_id: ReplicaId::new(b"sqlite-ext"), + clock: LamportClock::default(), + nodes: SqliteNodeStore::prepare(db).map_err(|_| SQLITE_ERROR as c_int)?, + payloads: SqlitePayloadStore::prepare(db).map_err(|_| SQLITE_ERROR as c_int)?, + index: SqliteParentOpIndex::prepare(db, doc_id.to_vec()) + .map_err(|_| SQLITE_ERROR as c_int)?, + }, + &refreshed_meta, + |_| Ok(()), + |_| Ok(()), + ) + .map_err(|_| SQLITE_ERROR as c_int)?; + update_tree_meta_head( + db, + Some(catch_up.head.as_ref().ok_or(SQLITE_ERROR as c_int)?), + )?; + treecrdt_core::PersistedRemoteApplyResult { + inserted_count: apply_result.inserted_count, + affected_nodes: catch_up.affected_nodes, + catch_up_needed: false, + } + } else { + apply_result + }; let commit_rc = sqlite_exec(db, commit.as_ptr(), None, null_mut(), null_mut()); if commit_rc != SQLITE_OK as c_int { diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/node_store.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/node_store.rs index 00c16ef5..3c594c70 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/node_store.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/node_store.rs @@ -1,5 +1,6 @@ use super::*; use std::slice; +use treecrdt_core::NodeStore; fn sqlite_node_id_bytes(node: NodeId) -> [u8; 16] { node.0.to_be_bytes() @@ -773,3 +774,81 @@ impl treecrdt_core::NodeStore for SqliteNodeStore { Ok(out) } } + +impl treecrdt_core::ExactNodeStore for SqliteNodeStore { + fn set_last_change_exact( + &mut self, + node: NodeId, + vv: &VersionVector, + ) -> treecrdt_core::Result<()> { + self.ensure_node(node)?; + let node_bytes = sqlite_node_id_bytes(node); + unsafe { + sqlite_clear_bindings(self.update_last_change); + sqlite_reset(self.update_last_change); + sqlite_bind_blob( + self.update_last_change, + 1, + node_bytes.as_ptr() as *const c_void, + node_bytes.len() as c_int, + None, + ); + if vv.is_empty() { + sqlite_bind_null(self.update_last_change, 2); + } else { + let bytes = vv_to_bytes(vv)?; + sqlite_bind_blob( + self.update_last_change, + 2, + bytes.as_ptr() as *const c_void, + bytes.len() as c_int, + None, + ); + } + let step_rc = sqlite_step(self.update_last_change); + sqlite_reset(self.update_last_change); + if step_rc != SQLITE_DONE as c_int { + return Err(sqlite_rc_error(step_rc, "set exact last_change failed")); + } + } + Ok(()) + } + + fn set_deleted_at_exact( + &mut self, + node: NodeId, + vv: Option<&VersionVector>, + ) -> treecrdt_core::Result<()> { + self.ensure_node(node)?; + let node_bytes = sqlite_node_id_bytes(node); + unsafe { + sqlite_clear_bindings(self.update_deleted_at); + sqlite_reset(self.update_deleted_at); + sqlite_bind_blob( + self.update_deleted_at, + 1, + node_bytes.as_ptr() as *const c_void, + node_bytes.len() as c_int, + None, + ); + if let Some(vv) = vv.filter(|vv| !vv.is_empty()) { + let bytes = vv_to_bytes(vv)?; + sqlite_bind_blob( + self.update_deleted_at, + 2, + bytes.as_ptr() as *const c_void, + bytes.len() as c_int, + None, + ); + } else { + sqlite_bind_null(self.update_deleted_at, 2); + } + let step_rc = sqlite_step(self.update_deleted_at); + sqlite_reset(self.update_deleted_at); + if step_rc != SQLITE_DONE as c_int { + return Err(sqlite_rc_error(step_rc, "set exact deleted_at failed")); + } + } + Ok(()) + } +} diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/op_index.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/op_index.rs index 997565ec..54963e5d 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/op_index.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/op_index.rs @@ -102,3 +102,24 @@ impl treecrdt_core::ParentOpIndex for SqliteParentOpIndex { Ok(()) } } + +impl treecrdt_core::TruncatingParentOpIndex for SqliteParentOpIndex { + fn truncate_from(&mut self, seq: u64) -> treecrdt_core::Result<()> { + let sql = + CString::new("DELETE FROM oprefs_children WHERE seq >= ?1").expect("truncate oprefs"); + let mut stmt: *mut sqlite3_stmt = null_mut(); + let prep_rc = sqlite_prepare_v2(self.db, sql.as_ptr(), -1, &mut stmt, null_mut()); + if prep_rc != SQLITE_OK as c_int { + return Err(sqlite_rc_error(prep_rc, "prepare truncate oprefs failed")); + } + unsafe { + sqlite_bind_int64(stmt, 1, seq.min(i64::MAX as u64) as i64); + let step_rc = sqlite_step(stmt); + sqlite_finalize(stmt); + if step_rc != SQLITE_DONE as c_int { + return Err(sqlite_rc_error(step_rc, "truncate oprefs step failed")); + } + } + Ok(()) + } +} diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs index bdd0d111..1051bc44 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs @@ -12,6 +12,7 @@ pub(super) struct SqlitePayloadStore { db: *mut sqlite3, select: *mut sqlite3_stmt, upsert: *mut sqlite3_stmt, + delete: *mut sqlite3_stmt, } impl SqlitePayloadStore { @@ -31,9 +32,12 @@ impl SqlitePayloadStore { last_counter = excluded.last_counter", ) .expect("upsert payload sql"); + let delete_sql = + CString::new("DELETE FROM tree_payload WHERE node = ?1").expect("delete payload sql"); let mut select: *mut sqlite3_stmt = null_mut(); let mut upsert: *mut sqlite3_stmt = null_mut(); + let mut delete: *mut sqlite3_stmt = null_mut(); let prep = |sql: &CString, stmt: &mut *mut sqlite3_stmt| -> treecrdt_core::Result<()> { let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, stmt, null_mut()); @@ -44,8 +48,14 @@ impl SqlitePayloadStore { }; prep(&select_sql, &mut select)?; prep(&upsert_sql, &mut upsert)?; + prep(&delete_sql, &mut delete)?; - Ok(Self { db, select, upsert }) + Ok(Self { + db, + select, + upsert, + delete, + }) } } @@ -54,6 +64,7 @@ impl Drop for SqlitePayloadStore { unsafe { sqlite_finalize(self.select); sqlite_finalize(self.upsert); + sqlite_finalize(self.delete); } } } @@ -223,3 +234,30 @@ impl treecrdt_core::PayloadStore for SqlitePayloadStore { Ok(()) } } + +impl treecrdt_core::ExactPayloadStore for SqlitePayloadStore { + fn clear_payload(&mut self, node: NodeId) -> treecrdt_core::Result<()> { + let node_bytes = sqlite_node_id_bytes(node); + unsafe { + sqlite_clear_bindings(self.delete); + sqlite_reset(self.delete); + let bind_rc = sqlite_bind_blob( + self.delete, + 1, + node_bytes.as_ptr() as *const c_void, + node_bytes.len() as c_int, + None, + ); + if bind_rc != SQLITE_OK as c_int { + sqlite_reset(self.delete); + return Err(sqlite_rc_error(bind_rc, "bind delete payload failed")); + } + let step_rc = sqlite_step(self.delete); + sqlite_reset(self.delete); + if step_rc != SQLITE_DONE as c_int { + return Err(sqlite_rc_error(step_rc, "delete payload step failed")); + } + } + Ok(()) + } +} diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs index d7ea904f..ec3130c3 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs @@ -20,6 +20,10 @@ impl MaterializationCursor for TreeMeta { } } +pub(super) fn tree_meta_from_state(state: MaterializationState) -> TreeMeta { + TreeMeta(state) +} + pub(super) fn load_doc_id(db: *mut sqlite3) -> Result>, c_int> { let sql = CString::new("SELECT value FROM meta WHERE key = 'doc_id' LIMIT 1").expect("doc id sql"); diff --git a/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs b/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs index c9594b27..cab60588 100644 --- a/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs +++ b/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs @@ -4,7 +4,13 @@ use std::path::PathBuf; use rusqlite::Connection; use serde::{Deserialize, Serialize}; -use treecrdt_core::{order_key::allocate_between, ReplicaId, VersionVector}; +use treecrdt_core::{ + order_key::allocate_between, NodeId, Operation, OperationKind, ReplicaId, VersionVector, +}; +use treecrdt_test_support::{ + self as materialization_conformance, representative_remote_batch, + MaterializationConformanceHarness, +}; #[derive(Clone, Deserialize, Serialize)] struct JsonOp { @@ -20,6 +26,77 @@ struct JsonOp { payload: Option>, } +fn node_bytes_from_id(node: NodeId) -> Vec { + node.0.to_be_bytes().to_vec() +} + +fn bytes_to_node_id(bytes: &[u8]) -> NodeId { + NodeId(u128::from_be_bytes(bytes.try_into().unwrap())) +} + +fn vv_to_bytes(vv: &VersionVector) -> Vec { + serde_json::to_vec(vv).unwrap() +} + +fn json_op(op: &Operation) -> JsonOp { + let (kind, parent, node, new_parent, order_key, payload) = match &op.kind { + OperationKind::Insert { + parent, + node, + order_key, + payload, + } => ( + "insert", + Some(parent.0.to_be_bytes()), + node.0.to_be_bytes(), + None, + Some(order_key.clone()), + payload.clone(), + ), + OperationKind::Move { + node, + new_parent, + order_key, + } => ( + "move", + None, + node.0.to_be_bytes(), + Some(new_parent.0.to_be_bytes()), + Some(order_key.clone()), + None, + ), + OperationKind::Delete { node } => ("delete", None, node.0.to_be_bytes(), None, None, None), + OperationKind::Tombstone { node } => { + ("tombstone", None, node.0.to_be_bytes(), None, None, None) + } + OperationKind::Payload { node, payload } => ( + "payload", + None, + node.0.to_be_bytes(), + None, + None, + payload.clone(), + ), + }; + + JsonOp { + replica: op.meta.id.replica.as_bytes().to_vec(), + counter: op.meta.id.counter, + lamport: op.meta.lamport, + kind: kind.into(), + parent, + node, + new_parent, + order_key, + known_state: op.meta.known_state.as_ref().map(vv_to_bytes), + payload, + } +} + +fn json_ops(ops: &[Operation]) -> Vec { + ops.iter().map(json_op).collect() +} + fn read_tree_meta(conn: &Connection) -> (i64, Vec, i64, i64) { conn.query_row( "SELECT head_lamport, head_replica, head_counter, head_seq FROM tree_meta WHERE id = 1", @@ -103,90 +180,82 @@ fn ops_by_oprefs(conn: &Connection, refs: &[Vec]) -> Vec { serde_json::from_str(&ops_json).unwrap() } -fn representative_remote_batch(replica: &[u8]) -> (Vec, Vec, Vec, Vec) { - let root = node_bytes(0); - let p1 = node_bytes(1); - let p2 = node_bytes(2); - let child = node_bytes(3); - ( - p1.clone(), - p2.clone(), - child.clone(), - vec![ - JsonOp { - replica: replica.to_vec(), - counter: 1, - lamport: 1, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(p1.as_slice()).unwrap(), - new_parent: None, - order_key: Some((1u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }, - JsonOp { - replica: replica.to_vec(), - counter: 2, - lamport: 2, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(p2.as_slice()).unwrap(), - new_parent: None, - order_key: Some((2u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }, - JsonOp { - replica: replica.to_vec(), - counter: 3, - lamport: 3, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(p1.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(child.as_slice()).unwrap(), - new_parent: None, - order_key: Some((1u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }, - JsonOp { - replica: replica.to_vec(), - counter: 4, - lamport: 4, - kind: "payload".into(), - parent: None, - node: <[u8; 16]>::try_from(child.as_slice()).unwrap(), - new_parent: None, - order_key: None, - known_state: None, - payload: Some(vec![7]), - }, - JsonOp { - replica: replica.to_vec(), - counter: 5, - lamport: 5, - kind: "move".into(), - parent: None, - node: <[u8; 16]>::try_from(child.as_slice()).unwrap(), - new_parent: Some(<[u8; 16]>::try_from(p2.as_slice()).unwrap()), - order_key: Some((1u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }, - JsonOp { - replica: replica.to_vec(), - counter: 6, - lamport: 6, - kind: "payload".into(), - parent: None, - node: <[u8; 16]>::try_from(child.as_slice()).unwrap(), - new_parent: None, - order_key: None, - known_state: None, - payload: Some(vec![8]), - }, - ], - ) +struct SqliteConformanceHarness { + conn: Connection, +} + +impl MaterializationConformanceHarness for SqliteConformanceHarness { + fn append_ops(&self, ops: &[Operation]) { + append_ops_json(&self.conn, &json_ops(ops)); + } + + fn append_ops_with_affected_nodes(&self, ops: &[Operation]) -> Vec { + let (affected, _) = append_ops_json(&self.conn, &json_ops(ops)); + affected.iter().map(|bytes| bytes_to_node_id(bytes)).collect() + } + + fn visible_children(&self, parent: NodeId) -> Vec { + visible_children(&self.conn, &node_bytes_from_id(parent)) + .iter() + .map(|bytes| bytes_to_node_id(bytes)) + .collect() + } + + fn payload(&self, node: NodeId) -> Option> { + payload_bytes(&self.conn, &node_bytes_from_id(node)) + } + + fn replay_frontier(&self) -> Option { + match read_replay_frontier(&self.conn) { + (Some(lamport), Some(replica), Some(counter)) => { + Some(treecrdt_core::MaterializationFrontier { + lamport: lamport.max(0) as u64, + replica, + counter: counter.max(0) as u64, + }) + } + _ => None, + } + } + + fn head_seq(&self) -> u64 { + let (_, _, _, head_seq) = read_tree_meta(&self.conn); + head_seq.max(0) as u64 + } + + fn force_replay_from_start(&self) { + self.conn + .execute( + "UPDATE tree_meta \ + SET replay_lamport = 0, replay_replica = X'', replay_counter = 0 \ + WHERE id = 1", + [], + ) + .unwrap(); + } + + fn ensure_materialized(&self) { + let _: i64 = self + .conn + .query_row("SELECT treecrdt_ensure_materialized()", [], |row| { + row.get(0) + }) + .unwrap(); + } + + fn op_ref_counters_for_parent(&self, parent: NodeId) -> Vec { + ops_by_oprefs( + &self.conn, + &oprefs_children(&self.conn, &node_bytes_from_id(parent)), + ) + .iter() + .map(|op| op.counter) + .collect() + } +} + +fn setup_conformance_harness() -> SqliteConformanceHarness { + SqliteConformanceHarness { conn: setup_conn() } } #[test] @@ -357,143 +426,129 @@ fn remote_append_representative_batch_matches_postgres_shape() { let conn = setup_conn(); let root = node_bytes(0); - let (p1, p2, child, ops) = representative_remote_batch(b"rep"); - let (affected, _) = append_ops_json(&conn, &ops); + let replica = ReplicaId::new(b"rep"); + let (p1, p2, child, ops) = representative_remote_batch(&replica); + let (affected, _) = append_ops_json(&conn, &json_ops(&ops)); assert_eq!( affected, - vec![root.clone(), p1.clone(), p2.clone(), child.clone()] + vec![ + root.clone(), + node_bytes_from_id(p1), + node_bytes_from_id(p2), + node_bytes_from_id(child) + ] + ); + assert_eq!( + visible_children(&conn, &root), + vec![node_bytes_from_id(p1), node_bytes_from_id(p2)] + ); + assert_eq!( + visible_children(&conn, &node_bytes_from_id(p2)), + vec![node_bytes_from_id(child)] + ); + assert_eq!( + payload_bytes(&conn, &node_bytes_from_id(child)), + Some(vec![8]) ); - assert_eq!(visible_children(&conn, &root), vec![p1.clone(), p2.clone()]); - assert_eq!(visible_children(&conn, &p2), vec![child.clone()]); - assert_eq!(payload_bytes(&conn, &child), Some(vec![8])); - let ops_p2 = ops_by_oprefs(&conn, &oprefs_children(&conn, &p2)); + let ops_p2 = ops_by_oprefs(&conn, &oprefs_children(&conn, &node_bytes_from_id(p2))); assert!(ops_p2.iter().any(|op| op.kind == "move")); assert!(ops_p2.iter().any(|op| op.kind == "payload")); } #[test] -fn remote_append_out_of_order_uses_replay_frontier() { - let conn = setup_conn(); - - let root = node_bytes(0); - let second = JsonOp { - replica: b"ooo".to_vec(), - counter: 2, - lamport: 2, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(node_bytes(2).as_slice()).unwrap(), - new_parent: None, - order_key: Some((2u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }; - let first = JsonOp { - replica: b"ooo".to_vec(), - counter: 1, - lamport: 1, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(node_bytes(1).as_slice()).unwrap(), - new_parent: None, - order_key: Some((1u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }; - - append_ops_json(&conn, &[second]); - let (affected, _) = append_ops_json(&conn, &[first.clone()]); - assert!(affected.is_empty()); +fn remote_append_out_of_order_catches_up_immediately_from_frontier() { + let harness = setup_conformance_harness(); + materialization_conformance::out_of_order_append_catches_up_immediately_from_frontier(&harness); +} - let (_, _, _, head_seq_before) = read_tree_meta(&conn); - let (replay_lamport, replay_replica, replay_counter) = read_replay_frontier(&conn); - assert_eq!(head_seq_before, 1); - assert_eq!(replay_lamport, Some(first.lamport as i64)); - assert_eq!(replay_replica, Some(first.replica.clone())); - assert_eq!(replay_counter, Some(first.counter as i64)); +#[test] +fn remote_append_out_of_order_losing_payload_skips_replay_frontier() { + let harness = setup_conformance_harness(); + materialization_conformance::out_of_order_losing_payload_skips_replay_frontier(&harness); +} - let _: i64 = conn - .query_row("SELECT treecrdt_ensure_materialized()", [], |row| { - row.get(0) - }) - .unwrap(); +#[test] +fn remote_append_out_of_order_move_with_later_payload_catches_up_immediately() { + let harness = setup_conformance_harness(); + materialization_conformance::out_of_order_move_with_later_payload_catches_up_immediately( + &harness, + ); +} - assert_eq!( - visible_children(&conn, &root), - vec![node_bytes(1), node_bytes(2)] +#[test] +fn remote_append_out_of_order_insert_and_move_before_head_catches_up_immediately() { + let harness = setup_conformance_harness(); + materialization_conformance::out_of_order_insert_and_move_before_head_catches_up_immediately( + &harness, ); - let (_, _, _, head_seq_after) = read_tree_meta(&conn); - assert_eq!(head_seq_after, 2); - assert_eq!(read_replay_frontier(&conn), (None, None, None)); +} - let ops = ops_by_oprefs(&conn, &oprefs_children(&conn, &root)); - assert_eq!( - ops.iter().map(|op| op.counter).collect::>(), - vec![1, 2] +#[test] +fn remote_append_replay_from_start_frontier_catches_up_immediately() { + let harness = setup_conformance_harness(); + materialization_conformance::replay_from_start_frontier_catches_up_immediately(&harness); +} + +#[test] +fn remote_deferred_recovery_from_replay_frontier_catches_up_on_ensure() { + let harness = setup_conformance_harness(); + materialization_conformance::deferred_recovery_from_replay_frontier_catches_up_on_ensure( + &harness, ); } #[test] -fn remote_append_replay_from_start_frontier_recovers_materialized_state() { +fn remote_failed_immediate_catch_up_rolls_back_inserted_ops_and_meta() { let conn = setup_conn(); - let root = node_bytes(0); - let first = JsonOp { - replica: b"restart".to_vec(), - counter: 1, - lamport: 1, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(node_bytes(1).as_slice()).unwrap(), - new_parent: None, - order_key: Some((1u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }; - let second = JsonOp { - replica: b"restart".to_vec(), - counter: 2, - lamport: 2, - kind: "insert".into(), - parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), - node: <[u8; 16]>::try_from(node_bytes(2).as_slice()).unwrap(), - new_parent: None, - order_key: Some((2u16).to_be_bytes().to_vec()), - known_state: None, - payload: None, - }; + let replica = ReplicaId::new(b"rollback"); + let second = Operation::insert( + &replica, + 2, + 2, + NodeId::ROOT, + materialization_conformance::node(2), + materialization_conformance::order_key_from_position(1), + ); + let first = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + materialization_conformance::node(1), + materialization_conformance::order_key_from_position(0), + ); - append_ops_json(&conn, &[first]); - conn.execute( - "UPDATE tree_meta \ - SET replay_lamport = 0, replay_replica = X'', replay_counter = 0 \ - WHERE id = 1", - [], + append_ops_json(&conn, &json_ops(&[second])); + conn.execute_batch( + "CREATE TRIGGER fail_tree_nodes_insert \ + BEFORE INSERT ON tree_nodes \ + BEGIN \ + SELECT RAISE(ROLLBACK, 'forced catch-up failure'); \ + END;", ) .unwrap(); - let (affected, _) = append_ops_json(&conn, &[second]); - assert!(affected.is_empty()); - assert_eq!( - read_replay_frontier(&conn), - (Some(0), Some(Vec::new()), Some(0)) + let append_json = serde_json::to_string(&json_ops(&[first])).unwrap(); + let append_result: rusqlite::Result = conn.query_row( + "SELECT treecrdt_append_ops(?1)", + rusqlite::params![append_json], + |row| row.get(0), ); + assert!(append_result.is_err()); - let _: i64 = conn - .query_row("SELECT treecrdt_ensure_materialized()", [], |row| { - row.get(0) - }) - .unwrap(); + let (_, _, _, head_seq) = read_tree_meta(&conn); + let op_count: i64 = conn.query_row("SELECT COUNT(*) FROM ops", [], |row| row.get(0)).unwrap(); + assert_eq!(op_count, 1); + assert_eq!(read_replay_frontier(&conn), (None, None, None)); + assert_eq!(head_seq, 1); assert_eq!( - visible_children(&conn, &root), - vec![node_bytes(1), node_bytes(2)] + visible_children(&conn, &node_bytes(0)), + vec![node_bytes_from_id(materialization_conformance::node(2))] ); - let (_, _, _, head_seq) = read_tree_meta(&conn); - assert_eq!(head_seq, 2); - assert_eq!(read_replay_frontier(&conn), (None, None, None)); } #[test] diff --git a/packages/treecrdt-test-support/Cargo.toml b/packages/treecrdt-test-support/Cargo.toml new file mode 100644 index 00000000..b63bd89a --- /dev/null +++ b/packages/treecrdt-test-support/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "treecrdt-test-support" +version = "0.0.1" +edition = "2021" +license = "MIT" +publish = false +description = "Shared test support for TreeCRDT backend conformance suites." + +[dependencies] +treecrdt-core = { path = "../treecrdt-core" } diff --git a/packages/treecrdt-test-support/src/lib.rs b/packages/treecrdt-test-support/src/lib.rs new file mode 100644 index 00000000..22834a18 --- /dev/null +++ b/packages/treecrdt-test-support/src/lib.rs @@ -0,0 +1,233 @@ +use std::slice; + +use treecrdt_core::{MaterializationFrontier, NodeId, Operation, ReplicaId}; + +pub trait MaterializationConformanceHarness { + fn append_ops(&self, ops: &[Operation]); + fn append_ops_with_affected_nodes(&self, ops: &[Operation]) -> Vec; + fn visible_children(&self, parent: NodeId) -> Vec; + fn payload(&self, node: NodeId) -> Option>; + fn replay_frontier(&self) -> Option; + fn head_seq(&self) -> u64; + fn force_replay_from_start(&self); + fn ensure_materialized(&self); + fn op_ref_counters_for_parent(&self, parent: NodeId) -> Vec; +} + +pub fn order_key_from_position(position: u16) -> Vec { + let n = position.wrapping_add(1); + n.to_be_bytes().to_vec() +} + +pub fn node(n: u128) -> NodeId { + NodeId(n) +} + +pub fn representative_remote_batch( + replica: &ReplicaId, +) -> (NodeId, NodeId, NodeId, Vec) { + let p1 = node(1); + let p2 = node(2); + let child = node(3); + ( + p1, + p2, + child, + vec![ + Operation::insert(replica, 1, 1, NodeId::ROOT, p1, order_key_from_position(0)), + Operation::insert(replica, 2, 2, NodeId::ROOT, p2, order_key_from_position(1)), + Operation::insert(replica, 3, 3, p1, child, order_key_from_position(0)), + Operation::set_payload(replica, 4, 4, child, vec![7]), + Operation::move_node(replica, 5, 5, child, p2, order_key_from_position(0)), + Operation::set_payload(replica, 6, 6, child, vec![8]), + ], + ) +} + +fn assert_replay_cleared(harness: &H) { + assert_eq!(harness.replay_frontier(), None); +} + +pub fn out_of_order_append_catches_up_immediately_from_frontier< + H: MaterializationConformanceHarness, +>( + harness: &H, +) { + let replica = ReplicaId::new(b"ooo"); + let second = Operation::insert( + &replica, + 2, + 2, + NodeId::ROOT, + node(2), + order_key_from_position(1), + ); + let first = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + node(1), + order_key_from_position(0), + ); + + harness.append_ops(&[second]); + let affected = harness.append_ops_with_affected_nodes(slice::from_ref(&first)); + assert_eq!(affected, vec![NodeId::ROOT, node(1), node(2)]); + assert_replay_cleared(harness); + assert_eq!(harness.head_seq(), 2); + assert_eq!( + harness.visible_children(NodeId::ROOT), + vec![node(1), node(2)] + ); + assert_eq!(harness.op_ref_counters_for_parent(NodeId::ROOT), vec![1, 2]); +} + +pub fn out_of_order_losing_payload_skips_replay_frontier( + harness: &H, +) { + let replica = ReplicaId::new(b"payload-shortcut"); + let payload_node = node(7); + let insert = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + payload_node, + order_key_from_position(0), + ); + let winning_payload = Operation::set_payload(&replica, 3, 3, payload_node, vec![9]); + let losing_payload = Operation::set_payload(&replica, 2, 2, payload_node, vec![4]); + + harness.append_ops(&[insert, winning_payload]); + let affected = harness.append_ops_with_affected_nodes(slice::from_ref(&losing_payload)); + assert_eq!(affected, vec![payload_node]); + assert_replay_cleared(harness); + assert_eq!(harness.head_seq(), 3); + assert_eq!(harness.payload(payload_node), Some(vec![9])); +} + +pub fn out_of_order_move_with_later_payload_catches_up_immediately< + H: MaterializationConformanceHarness, +>( + harness: &H, +) { + let replica = ReplicaId::new(b"mixed-move"); + let p1 = node(1); + let p2 = node(2); + let child = node(3); + let insert_p1 = Operation::insert(&replica, 1, 1, NodeId::ROOT, p1, order_key_from_position(0)); + let insert_p2 = Operation::insert(&replica, 2, 2, NodeId::ROOT, p2, order_key_from_position(1)); + let insert_child = Operation::insert(&replica, 3, 3, p1, child, order_key_from_position(0)); + let earlier_payload = Operation::set_payload(&replica, 5, 5, child, vec![7]); + let out_of_order_move = + Operation::move_node(&replica, 4, 4, child, p2, order_key_from_position(0)); + let later_payload = Operation::set_payload(&replica, 6, 6, child, vec![9]); + + harness.append_ops(&[insert_p1, insert_p2, insert_child, earlier_payload]); + let affected = harness.append_ops_with_affected_nodes(&[later_payload, out_of_order_move]); + assert_eq!(affected, vec![p1, p2, child]); + assert_replay_cleared(harness); + assert_eq!(harness.head_seq(), 6); + assert_eq!(harness.visible_children(p1), Vec::::new()); + assert_eq!(harness.visible_children(p2), vec![child]); + assert_eq!(harness.payload(child), Some(vec![9])); +} + +pub fn out_of_order_insert_and_move_before_head_catches_up_immediately< + H: MaterializationConformanceHarness, +>( + harness: &H, +) { + let replica = ReplicaId::new(b"mixed-insert"); + let p1 = node(1); + let p2 = node(2); + let child = node(3); + let insert_p1 = Operation::insert(&replica, 1, 1, NodeId::ROOT, p1, order_key_from_position(0)); + let insert_p2 = Operation::insert(&replica, 2, 2, NodeId::ROOT, p2, order_key_from_position(1)); + let unrelated_head = Operation::set_payload(&replica, 5, 5, p2, vec![4]); + let out_of_order_insert = + Operation::insert(&replica, 3, 3, p1, child, order_key_from_position(0)); + let out_of_order_move = + Operation::move_node(&replica, 4, 4, child, p2, order_key_from_position(0)); + + harness.append_ops(&[insert_p1, insert_p2, unrelated_head]); + let affected = + harness.append_ops_with_affected_nodes(&[out_of_order_move, out_of_order_insert]); + assert_eq!(affected, vec![p1, p2, child]); + assert_replay_cleared(harness); + assert_eq!(harness.head_seq(), 5); + assert_eq!(harness.visible_children(p1), Vec::::new()); + assert_eq!(harness.visible_children(p2), vec![child]); + assert_eq!(harness.payload(p2), Some(vec![4])); +} + +pub fn replay_from_start_frontier_catches_up_immediately( + harness: &H, +) { + let replica = ReplicaId::new(b"restart"); + let first = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + node(1), + order_key_from_position(0), + ); + let second = Operation::insert( + &replica, + 2, + 2, + NodeId::ROOT, + node(2), + order_key_from_position(1), + ); + + harness.append_ops(&[first]); + harness.force_replay_from_start(); + + let affected = harness.append_ops_with_affected_nodes(&[second]); + assert_eq!(affected, vec![NodeId::ROOT, node(1), node(2)]); + assert_replay_cleared(harness); + assert_eq!( + harness.visible_children(NodeId::ROOT), + vec![node(1), node(2)] + ); + assert_eq!(harness.head_seq(), 2); +} + +pub fn deferred_recovery_from_replay_frontier_catches_up_on_ensure< + H: MaterializationConformanceHarness, +>( + harness: &H, +) { + let replica = ReplicaId::new(b"ensure"); + let first = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + node(1), + order_key_from_position(0), + ); + let second = Operation::insert( + &replica, + 2, + 2, + NodeId::ROOT, + node(2), + order_key_from_position(1), + ); + + harness.append_ops(&[first, second]); + harness.force_replay_from_start(); + harness.ensure_materialized(); + + assert_replay_cleared(harness); + assert_eq!( + harness.visible_children(NodeId::ROOT), + vec![node(1), node(2)] + ); + assert_eq!(harness.head_seq(), 2); + assert_eq!(harness.op_ref_counters_for_parent(NodeId::ROOT), vec![1, 2]); +}