diff --git a/examples/playground/src/App.tsx b/examples/playground/src/App.tsx index 24a70488..6afedddf 100644 --- a/examples/playground/src/App.tsx +++ b/examples/playground/src/App.tsx @@ -162,7 +162,6 @@ export default function App() { persistSyncSettings(syncServerUrl, syncTransportMode); }, [syncServerUrl, syncTransportMode]); - const counterRef = useRef(0); const lamportRef = useRef(0); const initEpochRef = useRef(0); const disposedRef = useRef(false); @@ -483,18 +482,14 @@ export default function App() { const active = nextClient ?? clientRef.current ?? client; if (!active) return; try { - const [lamport, counter] = await Promise.all([ - active.meta.headLamport(), - replica ? active.meta.replicaMaxCounter(replica) : Promise.resolve(0), - ]); + const lamport = await active.meta.headLamport(); lamportRef.current = Math.max(lamportRef.current, lamport); setHeadLamport(lamportRef.current); - counterRef.current = Math.max(counterRef.current, counter); } catch (err) { console.error("Failed to refresh meta", err); } }, - [client, replica] + [client] ); const refreshParentsScheduledRef = useRef(false); @@ -587,6 +582,7 @@ export default function App() { const { peers, + syncTargetCount, remoteSyncStatus, syncBusy, liveBusy, @@ -624,7 +620,6 @@ export default function App() { revocationCutoverTokenId, revocationCutoverCounter, treeStateRef, - refreshMeta, refreshParents, refreshNodeCount, getLocalIdentityChain, @@ -831,7 +826,6 @@ export default function App() { setPayloadVersion((v) => v + 1); knownOpsRef.current = new Set(); setCollapse({ defaultCollapsed: true, overrides: new Set([ROOT_ID]) }); - counterRef.current = 0; lamportRef.current = 0; setHeadLamport(0); setTotalNodes(null); @@ -887,7 +881,6 @@ export default function App() { await verifyLocalOps([op]); lamportRef.current = Math.max(lamportRef.current, op.meta.lamport); - counterRef.current = Math.max(counterRef.current, op.meta.id.counter); setHeadLamport(lamportRef.current); notifyLocalUpdate([op]); @@ -916,7 +909,6 @@ export default function App() { scheduleRefreshParents(parentsAffectedByOps(stateBefore, [op])); scheduleRefreshNodeCount(); lamportRef.current = Math.max(lamportRef.current, op.meta.lamport); - counterRef.current = Math.max(counterRef.current, op.meta.id.counter); setHeadLamport(lamportRef.current); } catch (err) { console.error("Failed to append move op", err); @@ -1014,7 +1006,6 @@ export default function App() { for (const op of ops) { lamportRef.current = Math.max(lamportRef.current, op.meta.lamport); - counterRef.current = Math.max(counterRef.current, op.meta.id.counter); } setHeadLamport(lamportRef.current); @@ -1067,7 +1058,6 @@ export default function App() { await ensureChildrenLoaded(parentId, { force: true }); } lamportRef.current = Math.max(lamportRef.current, op.meta.lamport); - counterRef.current = Math.max(counterRef.current, op.meta.id.counter); setHeadLamport(lamportRef.current); setCollapse((prev) => { const overrides = new Set(prev.overrides); @@ -1294,7 +1284,7 @@ export default function App() { busy={busy} syncBusy={syncBusy} liveBusy={liveBusy} - peerCount={peers.length} + peerCount={syncTargetCount} authCanSyncAll={authCanSyncAll} onSync={() => void (authCanSyncAll ? handleSync({ all: {} }) : handleScopedSync())} liveAllEnabled={liveAllEnabled} @@ -1417,7 +1407,7 @@ export default function App() { onSync={() => { void (authCanSyncAll ? handleSync({ all: {} }) : handleScopedSync()); }} - canSync={status === "ready" && !busy && !syncBusy && peers.length > 0 && online} + canSync={status === "ready" && !busy && !syncBusy && syncTargetCount > 0 && online} onDetails={() => setShowAuthPanel(true)} /> diff --git a/examples/playground/src/playground/hooks/usePlaygroundAuth.ts b/examples/playground/src/playground/hooks/usePlaygroundAuth.ts index 6db32278..fd7f4bd4 100644 --- a/examples/playground/src/playground/hooks/usePlaygroundAuth.ts +++ b/examples/playground/src/playground/hooks/usePlaygroundAuth.ts @@ -113,6 +113,38 @@ const ALLOWED_GRANT_ACTIONS = new Set([ "read_payload", ]); +const SYNC_AUTH_PREFLIGHT_RETRIES = 12; +const SYNC_AUTH_PREFLIGHT_RETRY_DELAY_MS = 250; + +function delay(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + +async function waitForSyncAuthPreflight( + auth: SyncAuth, + docId: string, + opts: { attempts?: number; delayMs?: number } = {}, +): Promise { + const attempts = Math.max(1, opts.attempts ?? SYNC_AUTH_PREFLIGHT_RETRIES); + const delayMs = Math.max(0, opts.delayMs ?? SYNC_AUTH_PREFLIGHT_RETRY_DELAY_MS); + let lastErr: unknown = null; + + for (let attempt = 1; attempt <= attempts; attempt += 1) { + try { + await auth.helloCapabilities?.({ docId }); + return; + } catch (err) { + lastErr = err; + if (attempt === attempts) break; + await delay(delayMs); + } + } + + throw lastErr ?? new Error("sync auth preflight failed"); +} + function normalizeGrantActions(input: string[]): string[] { const out: string[] = []; for (const raw of input) { @@ -633,8 +665,9 @@ export function usePlaygroundAuth(opts: UsePlaygroundAuthOptions): PlaygroundAut void (async () => { try { - await preparedAuth.helloCapabilities?.({ docId }); + await waitForSyncAuthPreflight(preparedAuth, docId); if (cancelled) return; + setAuthError(null); setSyncAuth(preparedAuth); } catch (err) { if (cancelled) return; diff --git a/examples/playground/src/playground/hooks/usePlaygroundSync.ts b/examples/playground/src/playground/hooks/usePlaygroundSync.ts index ca68d30d..d1c0c5b2 100644 --- a/examples/playground/src/playground/hooks/usePlaygroundSync.ts +++ b/examples/playground/src/playground/hooks/usePlaygroundSync.ts @@ -155,6 +155,7 @@ function formatRemoteErrorDetail( } export type PlaygroundSyncApi = { peers: PeerInfo[]; + syncTargetCount: number; remoteSyncStatus: RemoteSyncStatus; syncBusy: boolean; liveBusy: boolean; @@ -195,9 +196,8 @@ export type UsePlaygroundSyncOptions = { revocationCutoverTokenId: string; revocationCutoverCounter: string; treeStateRef: React.MutableRefObject; - refreshMeta: () => Promise; - refreshParents: (parentIds: string[]) => Promise; - refreshNodeCount: () => Promise; + refreshParents: (parentIds: Iterable) => Promise | void; + refreshNodeCount: () => Promise | void; getLocalIdentityChain: () => Promise; onPeerIdentityChain: (chain: { identityPublicKey: Uint8Array; @@ -227,7 +227,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn authCanSyncAll, viewRootId, treeStateRef, - refreshMeta, refreshParents, refreshNodeCount, onAuthGrantMessage, @@ -238,6 +237,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn const [liveBusy, setLiveBusy] = useState(false); const [syncError, setSyncError] = useState(null); const [peers, setPeers] = useState([]); + const [syncTargetCount, setSyncTargetCount] = useState(0); const [remoteSyncStatus, setRemoteSyncStatus] = useState({ state: 'disabled', detail: 'Remote server transport is disabled in local tabs mode.', @@ -278,6 +278,12 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn const meshPeersRef = useRef([]); const remotePeerRef = useRef(null); + const publishSyncTargetCount = ( + connections: Map; detach: () => void }> = syncConnRef.current, + ) => { + setSyncTargetCount(connections.size); + }; + const publishPeers = () => { const merged: PeerInfo[] = [...meshPeersRef.current]; if (remotePeerRef.current) merged.push(remotePeerRef.current); @@ -568,6 +574,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn // ignore } connections.delete(peerId); + publishSyncTargetCount(connections); stopLiveAllForPeer(peerId); stopLiveChildrenForPeer(peerId); @@ -629,7 +636,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn if (lastErr) throw lastErr; throw new Error('No peers responded to sync.'); } - await refreshMeta(); await refreshParents(Object.keys(treeStateRef.current.childrenByParent)); await refreshNodeCount(); } catch (err) { @@ -702,7 +708,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn if (lastErr) throw lastErr; throw new Error('No peers responded to sync.'); } - await refreshMeta(); await refreshParents(Object.keys(treeStateRef.current.childrenByParent)); await refreshNodeCount(); } catch (err) { @@ -787,12 +792,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn ); } - await refreshMeta(); - const parentIds = new Set(Object.keys(treeStateRef.current.childrenByParent)); - parentIds.add(viewRootId); - await refreshParents(Array.from(parentIds)); - await refreshNodeCount(); - autoSyncDoneRef.current = true; if (typeof window !== 'undefined') { const url = new URL(window.location.href); @@ -819,9 +818,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn authMaterial.localTokensB64.length, autoSyncJoinTick, joinMode, - refreshMeta, - refreshNodeCount, - refreshParents, syncBusy, viewRootId, ]); @@ -981,8 +977,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn if (debugSync && ops.length > 0) { console.debug(`[sync:${selfPeerId}] applyOps(${ops.length})`); } - const affected = - ops.length > 0 ? ((await client.ops.appendMany(ops)) as unknown as string[]) : []; + const affected = ops.length > 0 ? await client.ops.appendMany(ops) : []; await onRemoteOpsApplied(ops, affected); }, }; @@ -1002,6 +997,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn const connections = new Map; detach: () => void }>(); syncConnRef.current = connections; + publishSyncTargetCount(connections); const maybeStartLiveForPeer = (peerId: string) => { if (!isRemotePeerId(peerId)) { @@ -1034,6 +1030,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn onPeerTransport: (peerId, transport) => { const detach = sharedPeer.attach(transport); connections.set(peerId, { transport, detach }); + publishSyncTargetCount(connections); maybeStartLiveForPeer(peerId); if (autoSyncJoinInitial && joinMode && !autoSyncDoneRef.current) { autoSyncPeerIdRef.current = peerId; @@ -1043,6 +1040,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn }, onPeerDisconnected: (peerId) => { connections.delete(peerId); + publishSyncTargetCount(connections); stopLiveAllForPeer(peerId); stopLiveChildrenForPeer(peerId); meshPeersRef.current = meshPeersRef.current.filter((p) => p.id !== peerId); @@ -1127,6 +1125,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn ); const detach = sharedPeer.attach(transport); syncConnRef.current.set(remotePeerId, { transport, detach }); + publishSyncTargetCount(); remotePeerRef.current = { id: remotePeerId, lastSeen: Date.now() }; publishPeers(); maybeStartLiveForPeer(remotePeerId); @@ -1220,6 +1219,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn liveBusyCountRef.current = 0; setLiveBusy(false); connections.clear(); + publishSyncTargetCount(connections); meshPeersRef.current = []; remotePeerRef.current = null; publishPeers(); @@ -1243,6 +1243,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn return { peers, + syncTargetCount, remoteSyncStatus, syncBusy, liveBusy, diff --git a/packages/treecrdt-core/src/lib.rs b/packages/treecrdt-core/src/lib.rs index 5a13aaf9..9d1c33e9 100644 --- a/packages/treecrdt-core/src/lib.rs +++ b/packages/treecrdt-core/src/lib.rs @@ -17,8 +17,9 @@ pub use ids::{Lamport, NodeId, OperationId, ReplicaId}; pub use materialization::{ apply_incremental_ops_with_delta, apply_persisted_remote_ops_with_delta, catch_up_materialized_state, materialize_persisted_remote_ops_with_delta, - IncrementalApplyResult, MaterializationCursor, MaterializationFrontier, MaterializationHead, - MaterializationKey, MaterializationState, PersistedRemoteApplyResult, PersistedRemoteStores, + should_checkpoint_materialization, IncrementalApplyResult, MaterializationCursor, + MaterializationFrontier, MaterializationHead, MaterializationKey, MaterializationState, + PersistedRemoteApplyResult, PersistedRemoteStores, MATERIALIZATION_CHECKPOINT_INTERVAL, }; pub use ops::{cmp_op_key, cmp_ops, Operation, OperationKind, OperationMetadata}; pub use traits::{ diff --git a/packages/treecrdt-core/src/materialization.rs b/packages/treecrdt-core/src/materialization.rs index 35059826..ce1a21f9 100644 --- a/packages/treecrdt-core/src/materialization.rs +++ b/packages/treecrdt-core/src/materialization.rs @@ -1,12 +1,9 @@ use std::cmp::Ordering; use crate::ops::{cmp_op_key, cmp_ops, Operation}; -use crate::traits::{ - Clock, LamportClock, MemoryNodeStore, MemoryPayloadStore, NodeStore, NoopStorage, - ParentOpIndex, PayloadStore, Storage, -}; +use crate::traits::{Clock, NodeStore, NoopStorage, ParentOpIndex, PayloadStore, Storage}; use crate::tree::TreeCrdt; -use crate::{Error, Lamport, NodeId, OperationId, ReplicaId, Result}; +use crate::{Error, Lamport, NodeId, ReplicaId, Result}; #[derive(Clone, Debug, Eq, PartialEq)] pub struct MaterializationKey> { @@ -23,6 +20,14 @@ impl> MaterializationKey { counter: self.counter, } } + + fn to_owned(&self) -> MaterializationKey { + MaterializationKey { + lamport: self.lamport, + replica: self.replica.as_ref().to_vec(), + counter: self.counter, + } + } } pub type MaterializationFrontier = MaterializationKey>; @@ -41,6 +46,13 @@ impl> MaterializationHead { seq: self.seq, } } + + fn to_owned(&self) -> MaterializationHead { + MaterializationHead { + at: self.at.to_owned(), + seq: self.seq, + } + } } pub type MaterializationHeadRef<'a> = MaterializationHead<&'a [u8]>; @@ -53,6 +65,8 @@ pub struct MaterializationState> { pub type MaterializationStateRef<'a> = MaterializationState<&'a [u8]>; +pub const MATERIALIZATION_CHECKPOINT_INTERVAL: u64 = 64; + impl MaterializationState { pub fn head_seq(&self) -> u64 { self.head.as_ref().map_or(0, |head| head.seq) @@ -73,6 +87,10 @@ pub trait MaterializationCursor { fn state(&self) -> MaterializationStateRef<'_>; } +pub fn should_checkpoint_materialization(head: &MaterializationHead) -> bool { + head.seq == 1 || head.seq.is_multiple_of(MATERIALIZATION_CHECKPOINT_INTERVAL) +} + #[derive(Clone, Debug, Eq, PartialEq)] pub struct IncrementalApplyResult { pub head: Option, @@ -106,30 +124,6 @@ pub struct PersistedRemoteStores { pub index: I, } -#[derive(Default)] -struct RecordingIndex { - records: Vec<(NodeId, OperationId, u64)>, -} - -impl ParentOpIndex for RecordingIndex { - fn reset(&mut self) -> Result<()> { - self.records.clear(); - Ok(()) - } - - fn record(&mut self, parent: NodeId, op_id: &OperationId, seq: u64) -> Result<()> { - self.records.push((parent, op_id.clone(), seq)); - Ok(()) - } -} - -struct PrefixSnapshot { - crdt: TreeCrdt, - index: RecordingIndex, - head: Option, - seq: u64, -} - fn frontier_from_op(op: &Operation) -> MaterializationFrontier { MaterializationFrontier { lamport: op.meta.lamport, @@ -138,14 +132,6 @@ fn frontier_from_op(op: &Operation) -> MaterializationFrontier { } } -fn owned_frontier>(frontier: &MaterializationKey) -> MaterializationFrontier { - MaterializationFrontier { - lamport: frontier.lamport, - replica: frontier.replica.as_ref().to_vec(), - counter: frontier.counter, - } -} - fn cmp_frontiers, R2: AsRef<[u8]>>( a: &MaterializationKey, b: &MaterializationKey, @@ -187,10 +173,7 @@ fn next_replay_frontier( let state = meta.state(); if let Some(existing) = state.replay_from.as_ref() { - return Some(earlier_frontier( - owned_frontier(existing), - earliest_inserted, - )); + return Some(earlier_frontier(existing.to_owned(), earliest_inserted)); } let head = state.head.as_ref()?; @@ -328,104 +311,25 @@ where Ok(result) } -fn build_prefix_snapshot( - storage: &S, - frontier: &MaterializationFrontier, - replica_id: &ReplicaId, -) -> Result { - let mut crdt = TreeCrdt::with_stores( - replica_id.clone(), - NoopStorage, - LamportClock::default(), - MemoryNodeStore::default(), - MemoryPayloadStore::default(), - )?; - let mut index = RecordingIndex::default(); - let mut seq = 0u64; - let mut head: Option = None; - - storage.scan_since(0, &mut |op| { - if cmp_frontiers(&frontier_from_op(&op), frontier) != Ordering::Less { - return Ok(()); - } - - match crdt.apply_remote_with_materialization_seq(op.clone(), &mut index, &mut seq)? { - Some(_) => { - head = Some(op); - Ok(()) - } - None => Err(Error::Storage( - "prefix replay unexpectedly required nested catch-up".into(), - )), - } - })?; - - Ok(PrefixSnapshot { - crdt, - index, - head, - seq, - }) -} - -fn restore_prefix_snapshot( - prefix: &mut PrefixSnapshot, - nodes: &mut N, - payloads: &mut P, - index: &mut I, -) -> Result<()> { - let mut all_nodes = prefix.crdt.node_store_mut().all_nodes()?; - all_nodes.sort(); - - for node in &all_nodes { - nodes.ensure_node(*node)?; - } - - for node in &all_nodes { - if *node == NodeId::ROOT { - continue; - } - let parent = prefix.crdt.node_store_mut().parent(*node)?; - let order_key = prefix.crdt.node_store_mut().order_key(*node)?; - if let Some(parent) = parent { - nodes.attach(*node, parent, order_key.unwrap_or_default())?; - } else { - nodes.detach(*node)?; - } - } - - for node in &all_nodes { - nodes.set_tombstone(*node, prefix.crdt.node_store_mut().tombstone(*node)?)?; - - let last_change = prefix.crdt.node_store_mut().last_change(*node)?; - if !last_change.is_empty() { - nodes.merge_last_change(*node, &last_change)?; - } - - if let Some(deleted_at) = prefix.crdt.node_store_mut().deleted_at(*node)? { - nodes.merge_deleted_at(*node, &deleted_at)?; - } - - if let Some(writer) = prefix.crdt.payload_last_writer(*node)? { - payloads.set_payload(*node, prefix.crdt.payload(*node)?, writer)?; - } - } - - let mut records = prefix.index.records.clone(); - records.sort_by(|a, b| a.2.cmp(&b.2).then_with(|| a.0.cmp(&b.0)).then_with(|| a.1.cmp(&b.1))); - for (parent, op_id, seq) in records { - index.record(parent, &op_id, seq)?; - } - - Ok(()) -} - /// Catch backend materialized state up to the persisted op log using the replay frontier when /// available. -pub fn catch_up_materialized_state( +pub fn catch_up_materialized_state< + S, + C, + N, + P, + I, + M, + LoadCheckpoint, + RestoreCheckpoint, + FlushNodes, + FlushIndex, +>( storage: S, stores: PersistedRemoteStores, meta: &M, + mut load_checkpoint: LoadCheckpoint, + mut restore_checkpoint: RestoreCheckpoint, mut flush_nodes: FlushNodes, mut flush_index: FlushIndex, ) -> Result> @@ -436,12 +340,20 @@ where P: PayloadStore, I: ParentOpIndex, M: MaterializationCursor, + LoadCheckpoint: FnMut(&MaterializationFrontier) -> Result>, + RestoreCheckpoint: FnMut(Option<&MaterializationHead>, &mut N, &mut P, &mut I) -> Result<()>, FlushNodes: FnMut(&mut N) -> Result<()>, FlushIndex: FnMut(&mut I) -> Result<()>, { - let replay_frontier = { + let (current_head, replay_frontier) = { let state = meta.state(); - state.replay_from.as_ref().map(owned_frontier) + ( + state.head.as_ref().map(MaterializationHead::to_owned), + state.replay_from.as_ref().map(MaterializationKey::to_owned), + ) + }; + let Some(replay_frontier) = replay_frontier else { + return Ok(current_head); }; let PersistedRemoteStores { @@ -452,50 +364,40 @@ where mut index, } = stores; - nodes.reset()?; - payloads.reset()?; - index.reset()?; + let mut result_head = load_checkpoint(&replay_frontier)?; + restore_checkpoint(result_head.as_ref(), &mut nodes, &mut payloads, &mut index)?; - let mut head: Option = None; - let mut seq = 0u64; - - if let Some(frontier) = replay_frontier.as_ref() { - let mut prefix = build_prefix_snapshot(&storage, frontier, &replica_id)?; - restore_prefix_snapshot(&mut prefix, &mut nodes, &mut payloads, &mut index)?; - head = prefix.head; - seq = prefix.seq; - } + let mut seq = result_head.as_ref().map_or(0, |head| head.seq); + let scan_after = result_head + .as_ref() + .map(|head| (head.at.lamport, head.at.replica.clone(), head.at.counter)); let mut crdt = TreeCrdt::with_stores(replica_id, NoopStorage, clock, nodes, payloads)?; - storage.scan_since(0, &mut |op| { - if let Some(frontier) = replay_frontier.as_ref() { - if cmp_frontiers(&frontier_from_op(&op), frontier) == Ordering::Less { - return Ok(()); - } - } - - match crdt.apply_remote_with_materialization_seq(op.clone(), &mut index, &mut seq)? { - Some(_) => { - head = Some(op); - Ok(()) + storage.scan_after( + scan_after + .as_ref() + .map(|(lamport, replica, counter)| (*lamport, replica.as_slice(), *counter)), + &mut |op| { + let next_frontier = frontier_from_op(&op); + match crdt.apply_remote_with_materialization_seq(op, &mut index, &mut seq)? { + Some(_) => { + result_head = Some(MaterializationHead { + at: next_frontier, + seq, + }); + Ok(()) + } + None => Err(Error::Storage( + "catch-up replay unexpectedly required nested catch-up".into(), + )), } - None => Err(Error::Storage( - "catch-up replay unexpectedly required nested catch-up".into(), - )), - } - })?; + }, + )?; flush_nodes(crdt.node_store_mut())?; flush_index(&mut index)?; - Ok(head.map(|head| MaterializationHead { - at: MaterializationKey { - lamport: head.meta.lamport, - replica: head.meta.id.replica.as_bytes().to_vec(), - counter: head.meta.id.counter, - }, - seq, - })) + Ok(result_head) } /// Apply already-persisted inserted remote ops and commit adapter-owned metadata writes. @@ -514,6 +416,11 @@ where M: MaterializationCursor, { let inserted_count = inserted_ops.len().min(u64::MAX as usize) as u64; + let frontier_recorded = || PersistedRemoteApplyResult { + inserted_count, + affected_nodes: Vec::new(), + frontier_recorded: true, + }; if inserted_count == 0 { return Ok(PersistedRemoteApplyResult { @@ -525,22 +432,14 @@ where if let Some(frontier) = next_replay_frontier(meta, &inserted_ops) { schedule_replay(&frontier)?; - return Ok(PersistedRemoteApplyResult { - inserted_count, - affected_nodes: Vec::new(), - frontier_recorded: true, - }); + return Ok(frontier_recorded()); } match materialize_inserted(inserted_ops) { Ok(result) => { let Some(head) = result.head else { schedule_replay(&start_replay_frontier())?; - return Ok(PersistedRemoteApplyResult { - inserted_count, - affected_nodes: Vec::new(), - frontier_recorded: true, - }); + return Ok(frontier_recorded()); }; if update_head(&head).is_ok() { @@ -551,20 +450,12 @@ where }) } else { schedule_replay(&start_replay_frontier())?; - Ok(PersistedRemoteApplyResult { - inserted_count, - affected_nodes: Vec::new(), - frontier_recorded: true, - }) + Ok(frontier_recorded()) } } Err(_) => { schedule_replay(&start_replay_frontier())?; - Ok(PersistedRemoteApplyResult { - inserted_count, - affected_nodes: Vec::new(), - frontier_recorded: true, - }) + Ok(frontier_recorded()) } } } diff --git a/packages/treecrdt-core/src/traits.rs b/packages/treecrdt-core/src/traits.rs index 3b3caa61..8d143e43 100644 --- a/packages/treecrdt-core/src/traits.rs +++ b/packages/treecrdt-core/src/traits.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet}; use crate::error::{Error, Result}; use crate::ids::{Lamport, NodeId, OperationId, ReplicaId}; -use crate::ops::{cmp_ops, Operation}; +use crate::ops::{cmp_op_key, cmp_ops, Operation}; use crate::version_vector::VersionVector; /// Pluggable clock to allow Lamport, Hybrid Logical Clock, or custom time strategies. @@ -56,6 +56,36 @@ pub trait Storage { } Ok(()) } + + /// Iterate operations strictly after the given canonical op-key. + /// + /// Default implementation loads into memory and filters in sorted order; storage backends can + /// override this with an efficient key-range query. + fn scan_after( + &self, + after: Option<(Lamport, &[u8], u64)>, + visit: &mut dyn FnMut(Operation) -> Result<()>, + ) -> Result<()> { + let mut ops = self.load_since(0)?; + ops.sort_by(cmp_ops); + for op in ops { + if let Some((lamport, replica, counter)) = after { + if cmp_op_key( + op.meta.lamport, + op.meta.id.replica.as_bytes(), + op.meta.id.counter, + lamport, + replica, + counter, + ) != Ordering::Greater + { + continue; + } + } + visit(op)?; + } + Ok(()) + } } /// Storage adapter used when operations are already provided by the caller. diff --git a/packages/treecrdt-core/src/version_vector.rs b/packages/treecrdt-core/src/version_vector.rs index c470f3b0..7b3cba6f 100644 --- a/packages/treecrdt-core/src/version_vector.rs +++ b/packages/treecrdt-core/src/version_vector.rs @@ -91,10 +91,7 @@ impl ReplicaVersion { } fn absorb_frontier_ranges(&mut self) { - loop { - let Some(&(start, end)) = self.ranges.first() else { - break; - }; + while let Some(&(start, end)) = self.ranges.first() { if start == self.frontier + 1 { self.frontier = end; self.ranges.remove(0); diff --git a/packages/treecrdt-core/tests/materialization_helpers.rs b/packages/treecrdt-core/tests/materialization_helpers.rs index 6ce1bac6..949aa1f7 100644 --- a/packages/treecrdt-core/tests/materialization_helpers.rs +++ b/packages/treecrdt-core/tests/materialization_helpers.rs @@ -1,9 +1,10 @@ use treecrdt_core::{ apply_incremental_ops_with_delta, apply_persisted_remote_ops_with_delta, - materialize_persisted_remote_ops_with_delta, LamportClock, MaterializationCursor, - MaterializationHead, MaterializationKey, MaterializationState, MemoryNodeStore, - MemoryPayloadStore, MemoryStorage, NodeId, NoopParentOpIndex, Operation, OperationId, - ParentOpIndex, PersistedRemoteStores, ReplicaId, TreeCrdt, + catch_up_materialized_state, cmp_op_key, materialize_persisted_remote_ops_with_delta, + LamportClock, MaterializationCursor, MaterializationFrontier, MaterializationHead, + MaterializationKey, MaterializationState, MemoryNodeStore, MemoryPayloadStore, MemoryStorage, + NodeId, NodeStore, NoopParentOpIndex, Operation, OperationId, ParentOpIndex, PayloadStore, + PersistedRemoteStores, ReplicaId, Storage, TreeCrdt, VersionVector, }; #[derive(Default)] @@ -74,6 +75,52 @@ impl ParentOpIndex for RecordingIndex { } } +#[derive(Default)] +struct ScanAfterStorage { + ops: Vec, +} + +impl Storage for ScanAfterStorage { + fn apply(&mut self, op: Operation) -> treecrdt_core::Result { + self.ops.push(op); + Ok(true) + } + + fn load_since(&self, lamport: u64) -> treecrdt_core::Result> { + Ok(self.ops.iter().filter(|op| op.meta.lamport > lamport).cloned().collect()) + } + + fn latest_lamport(&self) -> u64 { + self.ops.iter().map(|op| op.meta.lamport).max().unwrap_or_default() + } + + fn scan_after( + &self, + after: Option<(u64, &[u8], u64)>, + visit: &mut dyn FnMut(Operation) -> treecrdt_core::Result<()>, + ) -> treecrdt_core::Result<()> { + let mut ops = self.ops.clone(); + ops.sort_by(treecrdt_core::cmp_ops); + for op in ops { + if let Some((lamport, replica, counter)) = after { + if cmp_op_key( + op.meta.lamport, + op.meta.id.replica.as_bytes(), + op.meta.id.counter, + lamport, + replica, + counter, + ) != std::cmp::Ordering::Greater + { + continue; + } + } + visit(op)?; + } + Ok(()) + } +} + #[test] fn finalize_local_materialization_records_unique_hints_and_extras() { let mut crdt = TreeCrdt::new( @@ -486,3 +533,152 @@ fn materialize_persisted_remote_ops_with_delta_runs_prepare_and_flush_hooks() { vec![NodeId::ROOT, NodeId(10), NodeId(11)] ); } + +#[test] +fn catch_up_materialized_state_restores_checkpoint_and_replays_suffix() { + let replica = ReplicaId::new(b"remote"); + let op1 = Operation::insert(&replica, 1, 1, NodeId::ROOT, NodeId(10), vec![0x10]); + let op2 = Operation::insert(&replica, 2, 2, NodeId(10), NodeId(11), vec![0x20]); + let op3 = Operation::insert(&replica, 3, 3, NodeId::ROOT, NodeId(12), vec![0x30]); + + let mut storage = ScanAfterStorage::default(); + storage.apply(op1.clone()).unwrap(); + storage.apply(op2.clone()).unwrap(); + storage.apply(op3.clone()).unwrap(); + + let cursor = Cursor { + head_lamport: 3, + head_replica: replica.as_bytes().to_vec(), + head_counter: 3, + head_seq: 3, + replay_lamport: Some(2), + replay_replica: Some(replica.as_bytes().to_vec()), + replay_counter: Some(2), + }; + let checkpoint = MaterializationHead { + at: MaterializationKey { + lamport: 1, + replica: replica.as_bytes().to_vec(), + counter: 1, + }, + seq: 1, + }; + + let mut restored = false; + let mut flushed_children_root = Vec::new(); + let mut flushed_child_parent = None; + let head = catch_up_materialized_state( + storage, + PersistedRemoteStores { + replica_id: ReplicaId::new(b"adapter"), + clock: LamportClock::default(), + nodes: MemoryNodeStore::default(), + payloads: MemoryPayloadStore::default(), + index: RecordingIndex::default(), + }, + &cursor, + |frontier| { + assert_eq!( + frontier, + &MaterializationFrontier { + lamport: 2, + replica: replica.as_bytes().to_vec(), + counter: 2, + } + ); + Ok(Some(checkpoint.clone())) + }, + |checkpoint, nodes, payloads, index| { + nodes.reset()?; + payloads.reset()?; + index.reset()?; + if checkpoint.is_some() { + restored = true; + nodes.ensure_node(NodeId(10))?; + nodes.attach(NodeId(10), NodeId::ROOT, vec![0x10])?; + let mut vv = VersionVector::new(); + vv.observe(&replica, 1); + nodes.merge_last_change(NodeId(10), &vv)?; + nodes.merge_last_change(NodeId::ROOT, &vv)?; + index.record(NodeId::ROOT, &op1.meta.id, 1)?; + } + Ok(()) + }, + |nodes| { + flushed_children_root = nodes.children(NodeId::ROOT)?; + flushed_child_parent = nodes.parent(NodeId(11))?; + Ok(()) + }, + |_| Ok(()), + ) + .unwrap() + .expect("head after checkpoint catch-up"); + + assert!(restored); + assert_eq!(flushed_children_root, vec![NodeId(10), NodeId(12)]); + assert_eq!(flushed_child_parent, Some(NodeId(10))); + assert_eq!(head.at.counter, 3); + assert_eq!(head.seq, 3); +} + +#[test] +fn catch_up_materialized_state_is_noop_without_replay_frontier() { + let load_called = std::cell::Cell::new(false); + let restore_called = std::cell::Cell::new(false); + let flush_nodes_called = std::cell::Cell::new(false); + let flush_index_called = std::cell::Cell::new(false); + let replica = ReplicaId::new(b"remote"); + let cursor = Cursor { + head_lamport: 7, + head_replica: replica.as_bytes().to_vec(), + head_counter: 4, + head_seq: 9, + ..Cursor::default() + }; + + let head = catch_up_materialized_state( + ScanAfterStorage::default(), + PersistedRemoteStores { + replica_id: ReplicaId::new(b"adapter"), + clock: LamportClock::default(), + nodes: MemoryNodeStore::default(), + payloads: MemoryPayloadStore::default(), + index: RecordingIndex::default(), + }, + &cursor, + |_| { + load_called.set(true); + Ok(None) + }, + |_, _, _, _| { + restore_called.set(true); + Ok(()) + }, + |_| { + flush_nodes_called.set(true); + Ok(()) + }, + |_| { + flush_index_called.set(true); + Ok(()) + }, + ) + .unwrap() + .expect("current head"); + + assert!(!load_called.get()); + assert!(!restore_called.get()); + assert!(!flush_nodes_called.get()); + assert!(!flush_index_called.get()); + assert_eq!( + head, + MaterializationHead { + at: MaterializationKey { + lamport: 7, + replica: replica.as_bytes().to_vec(), + counter: 4, + }, + seq: 9, + } + ); +} diff --git a/packages/treecrdt-postgres-rs/src/local_ops.rs b/packages/treecrdt-postgres-rs/src/local_ops.rs index f69d70cf..2d8fd696 100644 --- a/packages/treecrdt-postgres-rs/src/local_ops.rs +++ b/packages/treecrdt-postgres-rs/src/local_ops.rs @@ -9,9 +9,9 @@ use treecrdt_core::{ }; use crate::store::{ - ensure_materialized_in_tx, load_tree_meta_for_update, set_tree_meta_replay_frontier, - update_tree_meta_head, PgCtx, PgNodeStore, PgOpStorage, PgParentOpIndex, PgPayloadStore, - TreeMeta, + ensure_materialized_in_tx, load_tree_meta_for_update, persist_materialized_head, + set_tree_meta_replay_frontier, PgCtx, PgNodeStore, PgOpStorage, PgParentOpIndex, + PgPayloadStore, TreeMeta, }; type LocalCrdt = TreeCrdt; @@ -110,7 +110,7 @@ fn finish_local_core_op( seq, }; if post_materialization_ok - && update_tree_meta_head(&session.ctx.client, &session.ctx.doc_id, Some(&head)).is_err() + && persist_materialized_head(&session.ctx.client, &session.ctx.doc_id, Some(&head)).is_err() { post_materialization_ok = false; } diff --git a/packages/treecrdt-postgres-rs/src/schema.rs b/packages/treecrdt-postgres-rs/src/schema.rs index 0f80e605..cdf13154 100644 --- a/packages/treecrdt-postgres-rs/src/schema.rs +++ b/packages/treecrdt-postgres-rs/src/schema.rs @@ -69,6 +69,53 @@ CREATE TABLE IF NOT EXISTS treecrdt_oprefs_children ( CREATE INDEX IF NOT EXISTS idx_treecrdt_oprefs_children_doc_parent_seq ON treecrdt_oprefs_children (doc_id, parent, seq); + +CREATE TABLE IF NOT EXISTS treecrdt_checkpoints ( + doc_id TEXT NOT NULL, + checkpoint_seq BIGINT NOT NULL, + head_lamport BIGINT NOT NULL, + head_replica BYTEA NOT NULL, + head_counter BIGINT NOT NULL, + PRIMARY KEY (doc_id, checkpoint_seq) +); + +CREATE INDEX IF NOT EXISTS idx_treecrdt_checkpoints_doc_head + ON treecrdt_checkpoints (doc_id, head_lamport, head_replica, head_counter); + +CREATE TABLE IF NOT EXISTS treecrdt_checkpoint_nodes ( + doc_id TEXT NOT NULL, + checkpoint_seq BIGINT NOT NULL, + node BYTEA NOT NULL, + parent BYTEA, + order_key BYTEA, + tombstone BOOLEAN NOT NULL DEFAULT FALSE, + last_change BYTEA, + deleted_at BYTEA, + PRIMARY KEY (doc_id, checkpoint_seq, node) +); + +CREATE TABLE IF NOT EXISTS treecrdt_checkpoint_payload ( + doc_id TEXT NOT NULL, + checkpoint_seq BIGINT NOT NULL, + node BYTEA NOT NULL, + payload BYTEA, + last_lamport BIGINT NOT NULL, + last_replica BYTEA NOT NULL, + last_counter BIGINT NOT NULL, + PRIMARY KEY (doc_id, checkpoint_seq, node) +); + +CREATE TABLE IF NOT EXISTS treecrdt_checkpoint_oprefs_children ( + doc_id TEXT NOT NULL, + checkpoint_seq BIGINT NOT NULL, + parent BYTEA NOT NULL, + op_ref BYTEA NOT NULL, + seq BIGINT NOT NULL, + PRIMARY KEY (doc_id, checkpoint_seq, parent, op_ref) +); + +CREATE INDEX IF NOT EXISTS idx_treecrdt_checkpoint_oprefs_children_doc_parent_seq + ON treecrdt_checkpoint_oprefs_children (doc_id, checkpoint_seq, parent, seq); "#; pub fn ensure_schema(client: &mut Client) -> Result<()> { @@ -99,6 +146,30 @@ pub fn reset_doc_for_tests(client: &mut Client, doc_id: &str) -> Result<()> { client .execute("DELETE FROM treecrdt_nodes WHERE doc_id = $1", &[&doc_id]) .map_err(|e| Error::Storage(format!("{e:?}")))?; + client + .execute( + "DELETE FROM treecrdt_checkpoint_oprefs_children WHERE doc_id = $1", + &[&doc_id], + ) + .map_err(|e| Error::Storage(format!("{e:?}")))?; + client + .execute( + "DELETE FROM treecrdt_checkpoint_payload WHERE doc_id = $1", + &[&doc_id], + ) + .map_err(|e| Error::Storage(format!("{e:?}")))?; + client + .execute( + "DELETE FROM treecrdt_checkpoint_nodes WHERE doc_id = $1", + &[&doc_id], + ) + .map_err(|e| Error::Storage(format!("{e:?}")))?; + client + .execute( + "DELETE FROM treecrdt_checkpoints WHERE doc_id = $1", + &[&doc_id], + ) + .map_err(|e| Error::Storage(format!("{e:?}")))?; client .execute("DELETE FROM treecrdt_ops WHERE doc_id = $1", &[&doc_id]) .map_err(|e| Error::Storage(format!("{e:?}")))?; diff --git a/packages/treecrdt-postgres-rs/src/store.rs b/packages/treecrdt-postgres-rs/src/store.rs index b5d830a3..44c77de8 100644 --- a/packages/treecrdt-postgres-rs/src/store.rs +++ b/packages/treecrdt-postgres-rs/src/store.rs @@ -7,10 +7,10 @@ use postgres::{Client, Row, Statement}; use treecrdt_core::{ apply_persisted_remote_ops_with_delta, catch_up_materialized_state, - materialize_persisted_remote_ops_with_delta, Error, Lamport, LamportClock, - MaterializationCursor, MaterializationFrontier, MaterializationHead, MaterializationKey, - MaterializationState, NodeId, Operation, OperationId, OperationKind, PersistedRemoteStores, - ReplicaId, Result, Storage, VersionVector, + materialize_persisted_remote_ops_with_delta, should_checkpoint_materialization, Error, Lamport, + LamportClock, MaterializationCursor, MaterializationFrontier, MaterializationHead, + MaterializationKey, MaterializationState, NodeId, Operation, OperationId, OperationKind, + PersistedRemoteStores, ReplicaId, Result, Storage, VersionVector, }; use crate::opref::{derive_op_ref_v0, OPREF_V0_WIDTH}; @@ -199,6 +199,163 @@ pub(crate) fn update_tree_meta_head>( Ok(()) } +pub(crate) fn persist_materialized_head>( + client: &Rc>, + doc_id: &str, + head: Option<&MaterializationHead>, +) -> Result<()> { + update_tree_meta_head(client, doc_id, head)?; + maybe_save_materialization_checkpoint(client, doc_id, head) +} + +fn exec_doc_sql(c: &mut Client, doc_id: &str, sql: &'static str) -> Result<()> { + c.execute(sql, &[&doc_id]).map_err(storage_debug)?; + Ok(()) +} + +fn exec_doc_checkpoint_sql( + c: &mut Client, + doc_id: &str, + checkpoint_seq: i64, + sql: &'static str, +) -> Result<()> { + c.execute(sql, &[&doc_id, &checkpoint_seq]).map_err(storage_debug)?; + Ok(()) +} + +pub(crate) fn maybe_save_materialization_checkpoint>( + client: &Rc>, + doc_id: &str, + head: Option<&MaterializationHead>, +) -> Result<()> { + let Some(head) = head else { + return Ok(()); + }; + if !should_checkpoint_materialization(head) { + return Ok(()); + } + + let checkpoint_seq = head.seq as i64; + let mut c = client.borrow_mut(); + for sql in [ + "DELETE FROM treecrdt_checkpoint_oprefs_children WHERE doc_id = $1 AND checkpoint_seq = $2", + "DELETE FROM treecrdt_checkpoint_payload WHERE doc_id = $1 AND checkpoint_seq = $2", + "DELETE FROM treecrdt_checkpoint_nodes WHERE doc_id = $1 AND checkpoint_seq = $2", + "DELETE FROM treecrdt_checkpoints WHERE doc_id = $1 AND checkpoint_seq = $2", + ] { + exec_doc_checkpoint_sql(&mut c, doc_id, checkpoint_seq, sql)?; + } + + c.execute( + "INSERT INTO treecrdt_checkpoints(doc_id, checkpoint_seq, head_lamport, head_replica, head_counter) \ + VALUES ($1, $2, $3, $4, $5)", + &[ + &doc_id, + &checkpoint_seq, + &(head.at.lamport as i64), + &head.at.replica.as_ref(), + &(head.at.counter as i64), + ], + ) + .map_err(storage_debug)?; + for sql in [ + "INSERT INTO treecrdt_checkpoint_nodes(doc_id, checkpoint_seq, node, parent, order_key, tombstone, last_change, deleted_at) \ + SELECT doc_id, $2, node, parent, order_key, tombstone, last_change, deleted_at \ + FROM treecrdt_nodes WHERE doc_id = $1", + "INSERT INTO treecrdt_checkpoint_payload(doc_id, checkpoint_seq, node, payload, last_lamport, last_replica, last_counter) \ + SELECT doc_id, $2, node, payload, last_lamport, last_replica, last_counter \ + FROM treecrdt_payload WHERE doc_id = $1", + "INSERT INTO treecrdt_checkpoint_oprefs_children(doc_id, checkpoint_seq, parent, op_ref, seq) \ + SELECT doc_id, $2, parent, op_ref, seq \ + FROM treecrdt_oprefs_children WHERE doc_id = $1", + ] { + exec_doc_checkpoint_sql(&mut c, doc_id, checkpoint_seq, sql)?; + } + Ok(()) +} + +pub(crate) fn load_materialization_checkpoint_before( + client: &Rc>, + doc_id: &str, + frontier: &MaterializationFrontier, +) -> Result> { + let mut c = client.borrow_mut(); + let rows = c + .query( + "SELECT checkpoint_seq, head_lamport, head_replica, head_counter \ + FROM treecrdt_checkpoints \ + WHERE doc_id = $1 \ + AND (head_lamport < $2 \ + OR (head_lamport = $2 AND head_replica < $3) \ + OR (head_lamport = $2 AND head_replica = $3 AND head_counter < $4)) \ + ORDER BY head_lamport DESC, head_replica DESC, head_counter DESC \ + LIMIT 1", + &[ + &doc_id, + &(frontier.lamport as i64), + &frontier.replica, + &(frontier.counter as i64), + ], + ) + .map_err(storage_debug)?; + let Some(row) = rows.first() else { + return Ok(None); + }; + + Ok(Some(MaterializationHead { + at: MaterializationKey { + lamport: row.get::<_, i64>(1).max(0) as Lamport, + replica: row.get::<_, Vec>(2), + counter: row.get::<_, i64>(3).max(0) as u64, + }, + seq: row.get::<_, i64>(0).max(0) as u64, + })) +} + +pub(crate) fn restore_materialization_checkpoint>( + client: &Rc>, + doc_id: &str, + checkpoint: Option<&MaterializationHead>, +) -> Result<()> { + let mut c = client.borrow_mut(); + for sql in [ + "DELETE FROM treecrdt_oprefs_children WHERE doc_id = $1", + "DELETE FROM treecrdt_payload WHERE doc_id = $1", + "DELETE FROM treecrdt_nodes WHERE doc_id = $1", + ] { + exec_doc_sql(&mut c, doc_id, sql)?; + } + + if let Some(checkpoint) = checkpoint { + let checkpoint_seq = checkpoint.seq as i64; + for sql in [ + "INSERT INTO treecrdt_nodes(doc_id, node, parent, order_key, tombstone, last_change, deleted_at) \ + SELECT doc_id, node, parent, order_key, tombstone, last_change, deleted_at \ + FROM treecrdt_checkpoint_nodes \ + WHERE doc_id = $1 AND checkpoint_seq = $2", + "INSERT INTO treecrdt_payload(doc_id, node, payload, last_lamport, last_replica, last_counter) \ + SELECT doc_id, node, payload, last_lamport, last_replica, last_counter \ + FROM treecrdt_checkpoint_payload \ + WHERE doc_id = $1 AND checkpoint_seq = $2", + "INSERT INTO treecrdt_oprefs_children(doc_id, parent, op_ref, seq) \ + SELECT doc_id, parent, op_ref, seq \ + FROM treecrdt_checkpoint_oprefs_children \ + WHERE doc_id = $1 AND checkpoint_seq = $2", + ] { + exec_doc_checkpoint_sql(&mut c, doc_id, checkpoint_seq, sql)?; + } + } else { + let root_bytes = node_to_bytes(NodeId::ROOT); + let empty: &[u8] = &[]; + c.execute( + "INSERT INTO treecrdt_nodes(doc_id, node, parent, order_key, tombstone) VALUES ($1, $2, NULL, $3, FALSE)", + &[&doc_id, &root_bytes.as_slice(), &empty], + ) + .map_err(storage_debug)?; + } + Ok(()) +} + #[derive(Clone)] pub(crate) struct PgCtx { pub(crate) doc_id: String, @@ -1134,6 +1291,55 @@ impl Storage for PgOpStorage { } Ok(()) } + + fn scan_after( + &self, + after: Option<(Lamport, &[u8], u64)>, + visit: &mut dyn FnMut(Operation) -> Result<()>, + ) -> Result<()> { + let ops = { + let mut c = self.ctx.client.borrow_mut(); + let rows = if let Some((lamport, replica, counter)) = after { + let stmt = self.ctx.stmt( + &mut c, + "SELECT lamport, replica, counter, kind, parent, node, new_parent, order_key, payload, known_state \ + FROM treecrdt_ops \ + WHERE doc_id = $1 \ + AND (lamport > $2 \ + OR (lamport = $2 AND replica > $3) \ + OR (lamport = $2 AND replica = $3 AND counter > $4)) \ + ORDER BY lamport, replica, counter", + )?; + c.query( + &stmt, + &[ + &self.ctx.doc_id, + &(lamport as i64), + &replica, + &(counter as i64), + ], + ) + .map_err(storage_debug)? + } else { + let stmt = self.ctx.stmt( + &mut c, + "SELECT lamport, replica, counter, kind, parent, node, new_parent, order_key, payload, known_state \ + FROM treecrdt_ops \ + WHERE doc_id = $1 \ + ORDER BY lamport, replica, counter", + )?; + c.query(&stmt, &[&self.ctx.doc_id]).map_err(storage_debug)? + }; + // Catch-up replays feed these ops back through the node/payload/index stores, so we + // must drop the Postgres client borrow before invoking the callback. + rows.into_iter().map(row_to_op).collect::>>()? + }; + + for op in ops { + visit(op)?; + } + Ok(()) + } } pub(crate) fn row_to_op(row: Row) -> Result { @@ -1640,7 +1846,7 @@ fn append_ops_in_tx( |inserted| materialize_inserted_ops(ctx.clone(), &meta, inserted), |head| { let started_at = Instant::now(); - let result = update_tree_meta_head(&ctx.client, &ctx.doc_id, Some(head)); + let result = persist_materialized_head(&ctx.client, &ctx.doc_id, Some(head)); update_head_ms += started_at.elapsed().as_secs_f64() * 1000.0; result }, @@ -1711,11 +1917,13 @@ pub(crate) fn ensure_materialized_in_tx(client: &Rc>, doc_id: &s index: PgParentOpIndex::new(ctx.clone()), }, &meta, + |frontier| load_materialization_checkpoint_before(client, doc_id, frontier), + |checkpoint, _, _, _| restore_materialization_checkpoint(client, doc_id, checkpoint), |nodes| nodes.flush_last_change(), |index| index.flush(), )?; - update_tree_meta_head(client, doc_id, head.as_ref())?; + persist_materialized_head(client, doc_id, head.as_ref())?; Ok(()) } diff --git a/packages/treecrdt-postgres-rs/tests/postgres_test.rs b/packages/treecrdt-postgres-rs/tests/postgres_test.rs index bc3990e2..71ea63f9 100644 --- a/packages/treecrdt-postgres-rs/tests/postgres_test.rs +++ b/packages/treecrdt-postgres-rs/tests/postgres_test.rs @@ -126,6 +126,71 @@ fn postgres_backend_append_batch_materializes_only_inserted_ops() { assert_eq!(head_seq, 2); } +#[test] +fn postgres_backend_persists_periodic_materialization_checkpoints() { + let Some(client) = connect() else { + return; + }; + ensure_schema_once(&client); + + let doc_id = format!("test-{}", Uuid::new_v4()); + { + let mut c = client.borrow_mut(); + reset_doc_for_tests(&mut c, &doc_id).unwrap(); + } + + let replica = ReplicaId::new(b"ckpt"); + let first = Operation::insert( + &replica, + 1, + 1, + NodeId::ROOT, + node(1), + order_key_from_position(0), + ); + let inserted = append_ops(&client, &doc_id, std::slice::from_ref(&first)).unwrap(); + assert_eq!(inserted, 1); + + let ops: Vec = (1..64u16) + .map(|i| { + Operation::insert( + &replica, + (i as u64) + 1, + (i as u64) + 1, + NodeId::ROOT, + node((i as u128) + 1), + order_key_from_position(i), + ) + }) + .collect(); + + let inserted = append_ops(&client, &doc_id, &ops).unwrap(); + assert_eq!(inserted, 63); + + let mut c = client.borrow_mut(); + let checkpoint_rows = c + .query( + "SELECT checkpoint_seq FROM treecrdt_checkpoints WHERE doc_id = $1 ORDER BY checkpoint_seq", + &[&doc_id], + ) + .unwrap(); + let checkpoints: Vec = checkpoint_rows + .into_iter() + .map(|row| row.get::<_, i64>(0).max(0) as u64) + .collect(); + assert_eq!(checkpoints, vec![1, 64]); + + let checkpoint_node_count = c + .query_one( + "SELECT COUNT(*) FROM treecrdt_checkpoint_nodes WHERE doc_id = $1 AND checkpoint_seq = 64", + &[&doc_id], + ) + .unwrap() + .get::<_, i64>(0) + .max(0) as u64; + assert_eq!(checkpoint_node_count, 65); +} + #[test] fn postgres_backend_append_with_affected_nodes_matches_representative_remote_batch() { let Some(client) = connect() else { diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs index 54b5b0cb..ec1aecc3 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/local_ops.rs @@ -2,6 +2,7 @@ use super::node_store::SqliteNodeStore; use super::op_index::SqliteParentOpIndex; use super::op_storage::SqliteOpStorage; use super::payload_store::SqlitePayloadStore; +use super::schema::persist_materialized_head; use super::util::{ read_blob, read_blob16, read_optional_blob16, read_required_blob, read_text, sqlite_err_from_core, sqlite_result_json, @@ -225,7 +226,7 @@ fn finish_local_core_op( }, seq: next_seq, }; - if post_materialization_ok && update_tree_meta_head(session.db, Some(&head)).is_err() { + if post_materialization_ok && persist_materialized_head(session.db, Some(&head)).is_err() { post_materialization_ok = false; } if !post_materialization_ok { diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs index c4879674..d10c9aa3 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/materialize.rs @@ -2,7 +2,10 @@ use super::append::JsonAppendOp; use super::node_store::SqliteNodeStore; use super::op_index::SqliteParentOpIndex; use super::payload_store::SqlitePayloadStore; -use super::schema::set_tree_meta_replay_frontier; +use super::schema::{ + load_materialization_checkpoint_before, persist_materialized_head, + restore_materialization_checkpoint, set_tree_meta_replay_frontier, +}; use super::util::sqlite_err_from_core; use super::*; use treecrdt_core::MaterializationCursor; @@ -24,6 +27,10 @@ fn parse_optional_node_id(bytes: &Option>) -> Result, c_i } } +fn sqlite_checkpoint_err(rc: c_int, context: &str) -> treecrdt_core::Error { + treecrdt_core::Error::Storage(format!("{context} (rc={rc})")) +} + fn json_append_op_to_operation(op: &JsonAppendOp) -> Result { use treecrdt_core::{Operation, OperationId, OperationKind, OperationMetadata, ReplicaId}; @@ -193,6 +200,14 @@ fn catch_up_materialized_from_frontier(db: *mut sqlite3) -> Result<(), c_int> { index, }, &meta, + |frontier| { + load_materialization_checkpoint_before(db, frontier) + .map_err(|rc| sqlite_checkpoint_err(rc, "load checkpoint failed")) + }, + |checkpoint, _, _, _| { + restore_materialization_checkpoint(db, checkpoint) + .map_err(|rc| sqlite_checkpoint_err(rc, "restore checkpoint failed")) + }, |_| Ok(()), |_| Ok(()), ) { @@ -203,7 +218,7 @@ fn catch_up_materialized_from_frontier(db: *mut sqlite3) -> Result<(), c_int> { } }; - let head_rc = update_tree_meta_head(db, head.as_ref()); + let head_rc = persist_materialized_head(db, head.as_ref()); if head_rc.is_err() { sqlite_exec(db, rollback.as_ptr(), None, null_mut(), null_mut()); return head_rc; @@ -268,7 +283,7 @@ pub(super) fn append_ops_impl( &meta, inserted_ops, |inserted| materialize_inserted_ops(db, doc_id, &meta, &inserted), - |head| update_tree_meta_head(db, Some(head)), + |head| persist_materialized_head(db, Some(head)), |frontier| set_tree_meta_replay_frontier(db, frontier), )?; diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/op_storage.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/op_storage.rs index 5aa9fc29..79c49344 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/op_storage.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/op_storage.rs @@ -1,3 +1,4 @@ +use super::util::{column_blob_vec, column_nonnegative_i64}; use super::*; fn sqlite_rc_error(rc: c_int, context: &str) -> treecrdt_core::Error { @@ -42,6 +43,205 @@ impl SqliteOpStorage { .as_deref() .ok_or_else(|| treecrdt_core::Error::Storage("doc_id not set".into())) } + + fn scan_ops_stmt( + &self, + stmt: *mut sqlite3_stmt, + step_context: &str, + finalize_context: &str, + visit: &mut dyn FnMut(treecrdt_core::Operation) -> treecrdt_core::Result<()>, + ) -> treecrdt_core::Result<()> { + loop { + let step_rc = unsafe { sqlite_step(stmt) }; + if step_rc == SQLITE_ROW as c_int { + let op = match read_operation_row(stmt) { + Ok(op) => op, + Err(err) => { + unsafe { sqlite_finalize(stmt) }; + return Err(err); + } + }; + if let Err(err) = visit(op) { + unsafe { sqlite_finalize(stmt) }; + return Err(err); + } + } else if step_rc == SQLITE_DONE as c_int { + break; + } else { + unsafe { sqlite_finalize(stmt) }; + return Err(sqlite_rc_error(step_rc, step_context)); + } + } + + let finalize_rc = unsafe { sqlite_finalize(stmt) }; + if finalize_rc != SQLITE_OK as c_int { + return Err(sqlite_rc_error(finalize_rc, finalize_context)); + } + Ok(()) + } + + fn scan_since_rows( + &self, + lamport: Lamport, + visit: &mut dyn FnMut(treecrdt_core::Operation) -> treecrdt_core::Result<()>, + ) -> treecrdt_core::Result<()> { + let sql = CString::new( + "SELECT replica,counter,lamport,kind,parent,node,new_parent,order_key,known_state,payload \ + FROM ops \ + WHERE lamport > ?1 \ + ORDER BY lamport, replica, counter", + ) + .expect("ops since sql"); + let mut stmt: *mut sqlite3_stmt = null_mut(); + let rc = sqlite_prepare_v2(self.db, sql.as_ptr(), -1, &mut stmt, null_mut()); + if rc != SQLITE_OK as c_int { + return Err(sqlite_rc_error(rc, "sqlite_prepare_v2 ops since failed")); + } + let bind_rc = unsafe { sqlite_bind_int64(stmt, 1, lamport as i64) }; + if bind_rc != SQLITE_OK as c_int { + unsafe { sqlite_finalize(stmt) }; + return Err(sqlite_rc_error(bind_rc, "bind ops since failed")); + } + + self.scan_ops_stmt( + stmt, + "ops since step failed", + "finalize ops since failed", + visit, + ) + } + + fn scan_after_rows( + &self, + after: Option<(Lamport, &[u8], u64)>, + visit: &mut dyn FnMut(treecrdt_core::Operation) -> treecrdt_core::Result<()>, + ) -> treecrdt_core::Result<()> { + let sql = match after { + Some(_) => CString::new( + "SELECT replica,counter,lamport,kind,parent,node,new_parent,order_key,known_state,payload \ + FROM ops \ + WHERE lamport > ?1 \ + OR (lamport = ?1 AND replica > ?2) \ + OR (lamport = ?1 AND replica = ?2 AND counter > ?3) \ + ORDER BY lamport, replica, counter", + ) + .expect("ops after sql"), + None => CString::new( + "SELECT replica,counter,lamport,kind,parent,node,new_parent,order_key,known_state,payload \ + FROM ops \ + ORDER BY lamport, replica, counter", + ) + .expect("ops scan all sql"), + }; + let mut stmt: *mut sqlite3_stmt = null_mut(); + let rc = sqlite_prepare_v2(self.db, sql.as_ptr(), -1, &mut stmt, null_mut()); + if rc != SQLITE_OK as c_int { + return Err(sqlite_rc_error(rc, "sqlite_prepare_v2 ops after failed")); + } + if let Some((lamport, replica, counter)) = after { + let mut bind_err = false; + unsafe { + bind_err |= sqlite_bind_int64(stmt, 1, lamport as i64) != SQLITE_OK as c_int; + bind_err |= sqlite_bind_blob( + stmt, + 2, + replica.as_ptr() as *const c_void, + replica.len() as c_int, + None, + ) != SQLITE_OK as c_int; + bind_err |= sqlite_bind_int64(stmt, 3, counter as i64) != SQLITE_OK as c_int; + } + if bind_err { + unsafe { sqlite_finalize(stmt) }; + return Err(sqlite_rc_error( + SQLITE_ERROR as c_int, + "bind ops after failed", + )); + } + } + + self.scan_ops_stmt( + stmt, + "ops after step failed", + "finalize ops after failed", + visit, + ) + } +} + +fn read_operation_row(stmt: *mut sqlite3_stmt) -> treecrdt_core::Result { + let replica = column_blob_vec(stmt, 0).unwrap_or_default(); + let counter = column_nonnegative_i64(stmt, 1) as u64; + let lamport_val = column_nonnegative_i64(stmt, 2) as Lamport; + + let kind_ptr = unsafe { sqlite_column_text(stmt, 3) } as *const u8; + let kind_len = unsafe { sqlite_column_bytes(stmt, 3) } as usize; + let kind = if kind_ptr.is_null() { + "" + } else { + std::str::from_utf8(unsafe { slice::from_raw_parts(kind_ptr, kind_len) }).unwrap_or("") + }; + + let parent = unsafe { column_blob16(stmt, 4) } + .map_err(|rc| sqlite_rc_error(rc, "read parent failed"))?; + let node = unsafe { column_blob16(stmt, 5) } + .map_err(|rc| sqlite_rc_error(rc, "read node failed"))? + .ok_or_else(|| sqlite_rc_error(SQLITE_ERROR as c_int, "node missing"))?; + let new_parent = unsafe { column_blob16(stmt, 6) } + .map_err(|rc| sqlite_rc_error(rc, "read new_parent failed"))?; + let order_key = column_blob_vec(stmt, 7).unwrap_or_default(); + let known_state = column_blob_vec(stmt, 8) + .filter(|bytes| !bytes.is_empty()) + .map(|bytes| vv_from_bytes(&bytes)) + .transpose()?; + let payload = column_blob_vec(stmt, 9); + + let op_kind = match kind { + "insert" => { + let parent = parent + .ok_or_else(|| sqlite_rc_error(SQLITE_ERROR as c_int, "insert missing parent"))?; + treecrdt_core::OperationKind::Insert { + parent: sqlite_bytes_to_node_id(parent), + node: sqlite_bytes_to_node_id(node), + order_key, + payload, + } + } + "move" => { + let new_parent = new_parent + .ok_or_else(|| sqlite_rc_error(SQLITE_ERROR as c_int, "move missing new_parent"))?; + treecrdt_core::OperationKind::Move { + node: sqlite_bytes_to_node_id(node), + new_parent: sqlite_bytes_to_node_id(new_parent), + order_key, + } + } + "delete" => treecrdt_core::OperationKind::Delete { + node: sqlite_bytes_to_node_id(node), + }, + "tombstone" => treecrdt_core::OperationKind::Tombstone { + node: sqlite_bytes_to_node_id(node), + }, + "payload" => treecrdt_core::OperationKind::Payload { + node: sqlite_bytes_to_node_id(node), + payload, + }, + _ => { + return Err(sqlite_rc_error(SQLITE_ERROR as c_int, "unknown op kind")); + } + }; + + Ok(treecrdt_core::Operation { + meta: treecrdt_core::OperationMetadata { + id: treecrdt_core::OperationId { + replica: treecrdt_core::ReplicaId(replica), + counter, + }, + lamport: lamport_val, + known_state, + }, + kind: op_kind, + }) } impl treecrdt_core::Storage for SqliteOpStorage { @@ -234,151 +434,11 @@ impl treecrdt_core::Storage for SqliteOpStorage { } fn load_since(&self, lamport: Lamport) -> treecrdt_core::Result> { - let sql = CString::new( - "SELECT replica,counter,lamport,kind,parent,node,new_parent,order_key,known_state,payload \ - FROM ops \ - WHERE lamport > ?1 \ - ORDER BY lamport, replica, counter", - ) - .expect("ops since sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(self.db, sql.as_ptr(), -1, &mut stmt, null_mut()); - if rc != SQLITE_OK as c_int { - return Err(sqlite_rc_error(rc, "sqlite_prepare_v2 ops since failed")); - } - let bind_rc = unsafe { sqlite_bind_int64(stmt, 1, lamport as i64) }; - if bind_rc != SQLITE_OK as c_int { - unsafe { sqlite_finalize(stmt) }; - return Err(sqlite_rc_error(bind_rc, "bind ops since failed")); - } - let mut out: Vec = Vec::new(); - loop { - let step_rc = unsafe { sqlite_step(stmt) }; - if step_rc == SQLITE_ROW as c_int { - let replica_ptr = unsafe { sqlite_column_blob(stmt, 0) } as *const u8; - let replica_len = unsafe { sqlite_column_bytes(stmt, 0) } as usize; - if replica_ptr.is_null() { - continue; - } - let replica = unsafe { slice::from_raw_parts(replica_ptr, replica_len) }.to_vec(); - let counter = unsafe { sqlite_column_int64(stmt, 1).max(0) as u64 }; - let lamport_val = unsafe { sqlite_column_int64(stmt, 2).max(0) as Lamport }; - - let kind_ptr = unsafe { sqlite_column_text(stmt, 3) } as *const u8; - let kind_len = unsafe { sqlite_column_bytes(stmt, 3) } as usize; - let kind = if kind_ptr.is_null() { - "" - } else { - std::str::from_utf8(unsafe { slice::from_raw_parts(kind_ptr, kind_len) }) - .unwrap_or("") - }; - - let parent = unsafe { column_blob16(stmt, 4) } - .map_err(|rc| sqlite_rc_error(rc, "read parent failed"))?; - let node = unsafe { column_blob16(stmt, 5) } - .map_err(|rc| sqlite_rc_error(rc, "read node failed"))? - .ok_or_else(|| sqlite_rc_error(SQLITE_ERROR as c_int, "node missing"))?; - let new_parent = unsafe { column_blob16(stmt, 6) } - .map_err(|rc| sqlite_rc_error(rc, "read new_parent failed"))?; - let order_key = if unsafe { sqlite_column_type(stmt, 7) } == SQLITE_NULL as c_int { - Vec::new() - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 7) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 7) } as usize; - if ptr.is_null() { - Vec::new() - } else { - unsafe { slice::from_raw_parts(ptr, len) }.to_vec() - } - }; - - let known_state = if unsafe { sqlite_column_type(stmt, 8) } == SQLITE_NULL as c_int - { - None - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 8) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 8) } as usize; - if ptr.is_null() || len == 0 { - None - } else { - Some(vv_from_bytes(unsafe { slice::from_raw_parts(ptr, len) })?) - } - }; - - let payload = if unsafe { sqlite_column_type(stmt, 9) } == SQLITE_NULL as c_int { - None - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 9) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 9) } as usize; - if ptr.is_null() { - None - } else { - Some(unsafe { slice::from_raw_parts(ptr, len) }.to_vec()) - } - }; - - let op_kind = match kind { - "insert" => { - let parent = parent.ok_or_else(|| { - sqlite_rc_error(SQLITE_ERROR as c_int, "insert missing parent") - })?; - treecrdt_core::OperationKind::Insert { - parent: sqlite_bytes_to_node_id(parent), - node: sqlite_bytes_to_node_id(node), - order_key, - payload, - } - } - "move" => { - let new_parent = new_parent.ok_or_else(|| { - sqlite_rc_error(SQLITE_ERROR as c_int, "move missing new_parent") - })?; - treecrdt_core::OperationKind::Move { - node: sqlite_bytes_to_node_id(node), - new_parent: sqlite_bytes_to_node_id(new_parent), - order_key, - } - } - "delete" => treecrdt_core::OperationKind::Delete { - node: sqlite_bytes_to_node_id(node), - }, - "tombstone" => treecrdt_core::OperationKind::Tombstone { - node: sqlite_bytes_to_node_id(node), - }, - "payload" => treecrdt_core::OperationKind::Payload { - node: sqlite_bytes_to_node_id(node), - payload, - }, - _ => { - unsafe { sqlite_finalize(stmt) }; - return Err(sqlite_rc_error(SQLITE_ERROR as c_int, "unknown op kind")); - } - }; - - out.push(treecrdt_core::Operation { - meta: treecrdt_core::OperationMetadata { - id: treecrdt_core::OperationId { - replica: treecrdt_core::ReplicaId(replica), - counter, - }, - lamport: lamport_val, - known_state, - }, - kind: op_kind, - }); - } else if step_rc == SQLITE_DONE as c_int { - break; - } else { - unsafe { sqlite_finalize(stmt) }; - return Err(sqlite_rc_error(step_rc, "ops since step failed")); - } - } - - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if finalize_rc != SQLITE_OK as c_int { - return Err(sqlite_rc_error(finalize_rc, "finalize ops since failed")); - } + self.scan_since_rows(lamport, &mut |op| { + out.push(op); + Ok(()) + })?; Ok(out) } @@ -387,156 +447,15 @@ impl treecrdt_core::Storage for SqliteOpStorage { lamport: Lamport, visit: &mut dyn FnMut(treecrdt_core::Operation) -> treecrdt_core::Result<()>, ) -> treecrdt_core::Result<()> { - let sql = CString::new( - "SELECT replica,counter,lamport,kind,parent,node,new_parent,order_key,known_state,payload \ - FROM ops \ - WHERE lamport > ?1 \ - ORDER BY lamport, replica, counter", - ) - .expect("ops since sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(self.db, sql.as_ptr(), -1, &mut stmt, null_mut()); - if rc != SQLITE_OK as c_int { - return Err(sqlite_rc_error(rc, "sqlite_prepare_v2 ops since failed")); - } - let bind_rc = unsafe { sqlite_bind_int64(stmt, 1, lamport as i64) }; - if bind_rc != SQLITE_OK as c_int { - unsafe { sqlite_finalize(stmt) }; - return Err(sqlite_rc_error(bind_rc, "bind ops since failed")); - } - - loop { - let step_rc = unsafe { sqlite_step(stmt) }; - if step_rc == SQLITE_ROW as c_int { - let replica_ptr = unsafe { sqlite_column_blob(stmt, 0) } as *const u8; - let replica_len = unsafe { sqlite_column_bytes(stmt, 0) } as usize; - if replica_ptr.is_null() { - continue; - } - let replica = unsafe { slice::from_raw_parts(replica_ptr, replica_len) }.to_vec(); - let counter = unsafe { sqlite_column_int64(stmt, 1).max(0) as u64 }; - let lamport_val = unsafe { sqlite_column_int64(stmt, 2).max(0) as Lamport }; - - let kind_ptr = unsafe { sqlite_column_text(stmt, 3) } as *const u8; - let kind_len = unsafe { sqlite_column_bytes(stmt, 3) } as usize; - let kind = if kind_ptr.is_null() { - "" - } else { - std::str::from_utf8(unsafe { slice::from_raw_parts(kind_ptr, kind_len) }) - .unwrap_or("") - }; - - let parent = unsafe { column_blob16(stmt, 4) } - .map_err(|rc| sqlite_rc_error(rc, "read parent failed"))?; - let node = unsafe { column_blob16(stmt, 5) } - .map_err(|rc| sqlite_rc_error(rc, "read node failed"))? - .ok_or_else(|| sqlite_rc_error(SQLITE_ERROR as c_int, "node missing"))?; - let new_parent = unsafe { column_blob16(stmt, 6) } - .map_err(|rc| sqlite_rc_error(rc, "read new_parent failed"))?; - let order_key = if unsafe { sqlite_column_type(stmt, 7) } == SQLITE_NULL as c_int { - Vec::new() - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 7) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 7) } as usize; - if ptr.is_null() { - Vec::new() - } else { - unsafe { slice::from_raw_parts(ptr, len) }.to_vec() - } - }; - - let known_state = if unsafe { sqlite_column_type(stmt, 8) } == SQLITE_NULL as c_int - { - None - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 8) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 8) } as usize; - if ptr.is_null() || len == 0 { - None - } else { - Some(vv_from_bytes(unsafe { slice::from_raw_parts(ptr, len) })?) - } - }; - - let payload = if unsafe { sqlite_column_type(stmt, 9) } == SQLITE_NULL as c_int { - None - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 9) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 9) } as usize; - if ptr.is_null() { - None - } else { - Some(unsafe { slice::from_raw_parts(ptr, len) }.to_vec()) - } - }; - - let op_kind = match kind { - "insert" => { - let parent = parent.ok_or_else(|| { - sqlite_rc_error(SQLITE_ERROR as c_int, "insert missing parent") - })?; - treecrdt_core::OperationKind::Insert { - parent: sqlite_bytes_to_node_id(parent), - node: sqlite_bytes_to_node_id(node), - order_key, - payload, - } - } - "move" => { - let new_parent = new_parent.ok_or_else(|| { - sqlite_rc_error(SQLITE_ERROR as c_int, "move missing new_parent") - })?; - treecrdt_core::OperationKind::Move { - node: sqlite_bytes_to_node_id(node), - new_parent: sqlite_bytes_to_node_id(new_parent), - order_key, - } - } - "delete" => treecrdt_core::OperationKind::Delete { - node: sqlite_bytes_to_node_id(node), - }, - "tombstone" => treecrdt_core::OperationKind::Tombstone { - node: sqlite_bytes_to_node_id(node), - }, - "payload" => treecrdt_core::OperationKind::Payload { - node: sqlite_bytes_to_node_id(node), - payload, - }, - _ => { - unsafe { sqlite_finalize(stmt) }; - return Err(sqlite_rc_error(SQLITE_ERROR as c_int, "unknown op kind")); - } - }; - - let op = treecrdt_core::Operation { - meta: treecrdt_core::OperationMetadata { - id: treecrdt_core::OperationId { - replica: treecrdt_core::ReplicaId(replica), - counter, - }, - lamport: lamport_val, - known_state, - }, - kind: op_kind, - }; - - if let Err(err) = visit(op) { - unsafe { sqlite_finalize(stmt) }; - return Err(err); - } - } else if step_rc == SQLITE_DONE as c_int { - break; - } else { - unsafe { sqlite_finalize(stmt) }; - return Err(sqlite_rc_error(step_rc, "ops since step failed")); - } - } + self.scan_since_rows(lamport, visit) + } - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if finalize_rc != SQLITE_OK as c_int { - return Err(sqlite_rc_error(finalize_rc, "finalize ops since failed")); - } - Ok(()) + fn scan_after( + &self, + after: Option<(Lamport, &[u8], u64)>, + visit: &mut dyn FnMut(treecrdt_core::Operation) -> treecrdt_core::Result<()>, + ) -> treecrdt_core::Result<()> { + self.scan_after_rows(after, visit) } fn latest_lamport(&self) -> Lamport { @@ -549,7 +468,7 @@ impl treecrdt_core::Storage for SqliteOpStorage { } let step_rc = unsafe { sqlite_step(stmt) }; let val = if step_rc == SQLITE_ROW as c_int { - unsafe { sqlite_column_int64(stmt, 0).max(0) as Lamport } + column_nonnegative_i64(stmt, 0) as Lamport } else { 0 }; @@ -585,7 +504,7 @@ impl treecrdt_core::Storage for SqliteOpStorage { unsafe { sqlite_finalize(stmt) }; return Err(sqlite_rc_error(step_rc, "max counter step failed")); } - let val = unsafe { sqlite_column_int64(stmt, 0).max(0) as u64 }; + let val = column_nonnegative_i64(stmt, 0) as u64; let finalize_rc = unsafe { sqlite_finalize(stmt) }; if finalize_rc != SQLITE_OK as c_int { diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/oprefs.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/oprefs.rs index 05359559..3c123e76 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/oprefs.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/oprefs.rs @@ -1,4 +1,4 @@ -use super::util::sqlite_result_json; +use super::util::{column_blob_vec, sqlite_result_json}; use super::*; pub(super) unsafe extern "C" fn treecrdt_oprefs_all( @@ -30,9 +30,16 @@ pub(super) unsafe extern "C" fn treecrdt_oprefs_all( loop { let step_rc = unsafe { sqlite_step(stmt) }; if step_rc == SQLITE_ROW as c_int { - let ptr = unsafe { sqlite_column_blob(stmt, 0) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 0) } as usize; - if ptr.is_null() || len != OPREF_V0_WIDTH { + let Some(op_ref) = column_blob_vec(stmt, 0) else { + unsafe { sqlite_finalize(stmt) }; + sqlite_result_error( + ctx, + b"treecrdt_oprefs_all: invalid op_ref (call treecrdt_set_doc_id)\0".as_ptr() + as *const c_char, + ); + return; + }; + if op_ref.len() != OPREF_V0_WIDTH { unsafe { sqlite_finalize(stmt) }; sqlite_result_error( ctx, @@ -41,7 +48,7 @@ pub(super) unsafe extern "C" fn treecrdt_oprefs_all( ); return; } - refs.push(unsafe { slice::from_raw_parts(ptr, len) }.to_vec()); + refs.push(op_ref); } else if step_rc == SQLITE_DONE as c_int { break; } else { @@ -120,14 +127,17 @@ pub(super) unsafe extern "C" fn treecrdt_oprefs_children( loop { let step_rc = unsafe { sqlite_step(stmt) }; if step_rc == SQLITE_ROW as c_int { - let ptr = unsafe { sqlite_column_blob(stmt, 0) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 0) } as usize; - if ptr.is_null() || len != OPREF_V0_WIDTH { + let Some(op_ref) = column_blob_vec(stmt, 0) else { + unsafe { sqlite_finalize(stmt) }; + sqlite_result_error_code(ctx, SQLITE_ERROR as c_int); + return; + }; + if op_ref.len() != OPREF_V0_WIDTH { unsafe { sqlite_finalize(stmt) }; sqlite_result_error_code(ctx, SQLITE_ERROR as c_int); return; } - refs.push(unsafe { slice::from_raw_parts(ptr, len) }.to_vec()); + refs.push(op_ref); } else if step_rc == SQLITE_DONE as c_int { break; } else { diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/ops.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/ops.rs index 81a1a434..4dc4832e 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/ops.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/ops.rs @@ -1,4 +1,4 @@ -use super::util::sqlite_result_json; +use super::util::{column_blob_vec, column_nonnegative_i64, sqlite_result_json}; use super::*; pub(super) unsafe extern "C" fn treecrdt_ops_by_oprefs( @@ -219,15 +219,12 @@ pub(super) unsafe extern "C" fn treecrdt_ops_since( fn read_row(stmt: *mut sqlite3_stmt) -> Result { unsafe { - let replica_ptr = sqlite_column_blob(stmt, 0); - let replica_len = sqlite_column_bytes(stmt, 0); - let counter = sqlite_column_int64(stmt, 1) as u64; - let lamport = sqlite_column_int64(stmt, 2) as Lamport; + let counter = column_nonnegative_i64(stmt, 1) as u64; + let lamport = column_nonnegative_i64(stmt, 2) as Lamport; let kind_ptr = sqlite_column_text(stmt, 3); let kind_len = sqlite_column_bytes(stmt, 3); - let replica = - std::slice::from_raw_parts(replica_ptr as *const u8, replica_len as usize).to_vec(); + let replica = column_blob_vec(stmt, 0).unwrap_or_default(); let kind = std::str::from_utf8(std::slice::from_raw_parts( kind_ptr as *const u8, kind_len as usize, diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs index bdd0d111..bf1bb665 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/payload_store.rs @@ -1,3 +1,4 @@ +use super::util::{column_blob_vec, column_nonnegative_i64}; use super::*; fn sqlite_node_id_bytes(node: NodeId) -> [u8; 16] { @@ -134,15 +135,9 @@ impl treecrdt_core::PayloadStore for SqlitePayloadStore { let step_rc = sqlite_step(self.select); let writer = if step_rc == SQLITE_ROW as c_int { - let lamport = sqlite_column_int64(self.select, 1).max(0) as Lamport; - let rep_ptr = sqlite_column_blob(self.select, 2) as *const u8; - let rep_len = sqlite_column_bytes(self.select, 2) as usize; - let replica = if rep_ptr.is_null() || rep_len == 0 { - Vec::new() - } else { - slice::from_raw_parts(rep_ptr, rep_len).to_vec() - }; - let counter = sqlite_column_int64(self.select, 3).max(0) as u64; + let lamport = column_nonnegative_i64(self.select, 1) as Lamport; + let replica = column_blob_vec(self.select, 2).unwrap_or_default(); + let counter = column_nonnegative_i64(self.select, 3) as u64; Some(( lamport, treecrdt_core::OperationId { diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs index d7ea904f..ac54ca95 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/schema.rs @@ -1,4 +1,5 @@ use super::sqlite_api::*; +use super::util::{column_blob_vec, column_nonnegative_i64}; use std::ffi::CString; use std::os::raw::{c_int, c_void}; @@ -6,194 +7,247 @@ use std::ptr::null_mut; use std::slice; use treecrdt_core::{ - Lamport, MaterializationCursor, MaterializationHead, MaterializationKey, MaterializationState, + should_checkpoint_materialization, Lamport, MaterializationCursor, MaterializationHead, + MaterializationKey, MaterializationState, }; pub(super) const ROOT_NODE_ID: [u8; 16] = [0u8; 16]; -#[derive(Clone, Debug)] -pub(super) struct TreeMeta(MaterializationState); - -impl MaterializationCursor for TreeMeta { - fn state(&self) -> MaterializationState<&[u8]> { - self.0.as_borrowed() +fn exec_sql_text(db: *mut sqlite3, sql: &str) -> Result<(), c_int> { + let sql = CString::new(sql).expect("sqlite exec sql"); + let rc = sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()); + if rc != SQLITE_OK as c_int { + return Err(rc); } + Ok(()) } -pub(super) fn load_doc_id(db: *mut sqlite3) -> Result>, c_int> { - let sql = - CString::new("SELECT value FROM meta WHERE key = 'doc_id' LIMIT 1").expect("doc id sql"); +fn with_stmt( + db: *mut sqlite3, + sql: &str, + run: impl FnOnce(*mut sqlite3_stmt) -> Result, +) -> Result { + let sql = CString::new(sql).expect("sqlite prepared sql"); let mut stmt: *mut sqlite3_stmt = null_mut(); let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, null_mut()); if rc != SQLITE_OK as c_int { return Err(rc); } - let step_rc = unsafe { sqlite_step(stmt) }; - if step_rc == SQLITE_ROW as c_int { - let ptr = unsafe { sqlite_column_text(stmt, 0) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 0) } as usize; - let value = if ptr.is_null() || len == 0 { - Vec::new() - } else { - unsafe { slice::from_raw_parts(ptr, len) }.to_vec() - }; - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if finalize_rc != SQLITE_OK as c_int { - return Err(finalize_rc); - } - Ok(Some(value)) - } else if step_rc == SQLITE_DONE as c_int { - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if finalize_rc != SQLITE_OK as c_int { - return Err(finalize_rc); + let result = run(stmt); + let finalize_rc = unsafe { sqlite_finalize(stmt) }; + match result { + Ok(value) => { + if finalize_rc != SQLITE_OK as c_int { + return Err(finalize_rc); + } + Ok(value) } - Ok(None) - } else { - unsafe { sqlite_finalize(stmt) }; - Err(step_rc) + Err(err) => Err(err), } } -pub(super) fn load_tree_meta(db: *mut sqlite3) -> Result { - let sql = CString::new( - "SELECT head_lamport, head_replica, head_counter, head_seq, \ - replay_lamport, replay_replica, replay_counter \ - FROM tree_meta WHERE id = 1 LIMIT 1", - ) - .expect("tree meta sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, null_mut()); +fn bind_i64_param(stmt: *mut sqlite3_stmt, idx: c_int, value: i64) -> Result<(), c_int> { + let rc = unsafe { sqlite_bind_int64(stmt, idx, value) }; + if rc != SQLITE_OK as c_int { + return Err(rc); + } + Ok(()) +} + +fn bind_blob_param(stmt: *mut sqlite3_stmt, idx: c_int, value: &[u8]) -> Result<(), c_int> { + let rc = unsafe { + sqlite_bind_blob( + stmt, + idx, + value.as_ptr() as *const c_void, + value.len() as c_int, + None, + ) + }; if rc != SQLITE_OK as c_int { return Err(rc); } + Ok(()) +} - let step_rc = unsafe { sqlite_step(stmt) }; - if step_rc != SQLITE_ROW as c_int { - unsafe { sqlite_finalize(stmt) }; - return Err(SQLITE_ERROR as c_int); +fn step_expect_done(stmt: *mut sqlite3_stmt) -> Result<(), c_int> { + let rc = unsafe { sqlite_step(stmt) }; + if rc != SQLITE_DONE as c_int { + return Err(rc); } + Ok(()) +} - let head_lamport = unsafe { sqlite_column_int64(stmt, 0) } as Lamport; - let rep_ptr = unsafe { sqlite_column_blob(stmt, 1) } as *const u8; - let rep_len = unsafe { sqlite_column_bytes(stmt, 1) } as usize; - let head_replica = if rep_ptr.is_null() || rep_len == 0 { - Vec::new() - } else { - unsafe { slice::from_raw_parts(rep_ptr, rep_len) }.to_vec() - }; - let head_counter = unsafe { sqlite_column_int64(stmt, 2) } as u64; - let head_seq = unsafe { sqlite_column_int64(stmt, 3) } as u64; - let replay_lamport = if unsafe { sqlite_column_type(stmt, 4) } == SQLITE_NULL as c_int { - None - } else { - Some(unsafe { sqlite_column_int64(stmt, 4).max(0) as Lamport }) - }; - let replay_replica = if unsafe { sqlite_column_type(stmt, 5) } == SQLITE_NULL as c_int { - None - } else { - let ptr = unsafe { sqlite_column_blob(stmt, 5) } as *const u8; - let len = unsafe { sqlite_column_bytes(stmt, 5) } as usize; - Some(if ptr.is_null() || len == 0 { - Vec::new() - } else { - unsafe { slice::from_raw_parts(ptr, len) }.to_vec() - }) - }; - let replay_counter = if unsafe { sqlite_column_type(stmt, 6) } == SQLITE_NULL as c_int { +fn exec_stmt_done( + db: *mut sqlite3, + sql: &str, + bind: impl FnOnce(*mut sqlite3_stmt) -> Result<(), c_int>, +) -> Result<(), c_int> { + with_stmt(db, sql, |stmt| { + bind(stmt)?; + step_expect_done(stmt) + }) +} + +fn exec_stmt_i64(db: *mut sqlite3, sql: &str, value: i64) -> Result<(), c_int> { + exec_stmt_done(db, sql, |stmt| bind_i64_param(stmt, 1, value)) +} + +fn column_optional_nonnegative_i64(stmt: *mut sqlite3_stmt, idx: c_int) -> Option { + if unsafe { sqlite_column_type(stmt, idx) } == SQLITE_NULL as c_int { None } else { - Some(unsafe { sqlite_column_int64(stmt, 6).max(0) as u64 }) - }; + Some(column_nonnegative_i64(stmt, idx)) + } +} - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if finalize_rc != SQLITE_OK as c_int { - return Err(finalize_rc); +fn column_materialization_key(stmt: *mut sqlite3_stmt, idx: c_int) -> MaterializationKey { + MaterializationKey { + lamport: column_nonnegative_i64(stmt, idx) as Lamport, + replica: column_blob_vec(stmt, idx + 1).unwrap_or_default(), + counter: column_nonnegative_i64(stmt, idx + 2) as u64, } +} - let head = if head_seq == 0 && head_lamport == 0 && head_replica.is_empty() && head_counter == 0 - { - None - } else { - Some(MaterializationHead { - at: MaterializationKey { - lamport: head_lamport, - replica: head_replica, - counter: head_counter, - }, - seq: head_seq, - }) - }; - let replay_from = match (replay_lamport, replay_replica, replay_counter) { +fn column_optional_materialization_key( + stmt: *mut sqlite3_stmt, + idx: c_int, +) -> Option { + match ( + column_optional_nonnegative_i64(stmt, idx), + column_blob_vec(stmt, idx + 1), + column_optional_nonnegative_i64(stmt, idx + 2), + ) { (Some(lamport), Some(replica), Some(counter)) => Some(MaterializationKey { - lamport, + lamport: lamport as Lamport, replica, - counter, + counter: counter as u64, }), _ => None, - }; + } +} + +fn bind_materialization_key>( + stmt: *mut sqlite3_stmt, + idx: c_int, + key: &MaterializationKey, +) -> Result<(), c_int> { + bind_i64_param(stmt, idx, key.lamport as i64)?; + bind_blob_param(stmt, idx + 1, key.replica.as_ref())?; + bind_i64_param(stmt, idx + 2, key.counter as i64) +} + +fn bind_materialization_head>( + stmt: *mut sqlite3_stmt, + idx: c_int, + head: &MaterializationHead, +) -> Result<(), c_int> { + bind_materialization_key(stmt, idx, &head.at)?; + bind_i64_param(stmt, idx + 3, head.seq as i64) +} + +fn bind_optional_materialization_head>( + stmt: *mut sqlite3_stmt, + idx: c_int, + head: Option<&MaterializationHead>, +) -> Result<(), c_int> { + match head { + Some(head) => bind_materialization_head(stmt, idx, head), + None => { + bind_i64_param(stmt, idx, 0)?; + bind_blob_param(stmt, idx + 1, &[])?; + bind_i64_param(stmt, idx + 2, 0)?; + bind_i64_param(stmt, idx + 3, 0) + } + } +} - Ok(TreeMeta(MaterializationState { head, replay_from })) +#[derive(Clone, Debug)] +pub(super) struct TreeMeta(MaterializationState); + +impl MaterializationCursor for TreeMeta { + fn state(&self) -> MaterializationState<&[u8]> { + self.0.as_borrowed() + } +} + +pub(super) fn load_doc_id(db: *mut sqlite3) -> Result>, c_int> { + with_stmt( + db, + "SELECT value FROM meta WHERE key = 'doc_id' LIMIT 1", + |stmt| { + let step_rc = unsafe { sqlite_step(stmt) }; + if step_rc == SQLITE_ROW as c_int { + let ptr = unsafe { sqlite_column_text(stmt, 0) } as *const u8; + let len = unsafe { sqlite_column_bytes(stmt, 0) } as usize; + let value = if ptr.is_null() || len == 0 { + Vec::new() + } else { + unsafe { slice::from_raw_parts(ptr, len) }.to_vec() + }; + Ok(Some(value)) + } else if step_rc == SQLITE_DONE as c_int { + Ok(None) + } else { + Err(step_rc) + } + }, + ) +} + +pub(super) fn load_tree_meta(db: *mut sqlite3) -> Result { + with_stmt( + db, + "SELECT head_lamport, head_replica, head_counter, head_seq, \ + replay_lamport, replay_replica, replay_counter \ + FROM tree_meta WHERE id = 1 LIMIT 1", + |stmt| { + let step_rc = unsafe { sqlite_step(stmt) }; + if step_rc != SQLITE_ROW as c_int { + return Err(SQLITE_ERROR as c_int); + } + + let head_seq = column_nonnegative_i64(stmt, 3) as u64; + let head_at = column_materialization_key(stmt, 0); + let head = if head_seq == 0 + && head_at.lamport == 0 + && head_at.replica.is_empty() + && head_at.counter == 0 + { + None + } else { + Some(MaterializationHead { + at: head_at, + seq: head_seq, + }) + }; + let replay_from = column_optional_materialization_key(stmt, 4); + + Ok(TreeMeta(MaterializationState { head, replay_from })) + }, + ) } pub(super) fn set_tree_meta_replay_frontier( db: *mut sqlite3, frontier: &treecrdt_core::MaterializationFrontier, ) -> Result<(), c_int> { - let sql = CString::new( + exec_stmt_done( + db, "UPDATE tree_meta \ SET replay_lamport = ?1, replay_replica = ?2, replay_counter = ?3 \ WHERE id = 1", + |stmt| bind_materialization_key(stmt, 1, frontier), ) - .expect("tree meta replay sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, null_mut()); - if rc != SQLITE_OK as c_int { - return Err(rc); - } - - let mut bind_err = false; - unsafe { - bind_err |= sqlite_bind_int64(stmt, 1, frontier.lamport as i64) != SQLITE_OK as c_int; - bind_err |= sqlite_bind_blob( - stmt, - 2, - frontier.replica.as_ptr() as *const c_void, - frontier.replica.len() as c_int, - None, - ) != SQLITE_OK as c_int; - bind_err |= sqlite_bind_int64(stmt, 3, frontier.counter as i64) != SQLITE_OK as c_int; - } - if bind_err { - unsafe { sqlite_finalize(stmt) }; - return Err(SQLITE_ERROR as c_int); - } - - let step_rc = unsafe { sqlite_step(stmt) }; - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if step_rc != SQLITE_DONE as c_int { - return Err(step_rc); - } - if finalize_rc != SQLITE_OK as c_int { - return Err(finalize_rc); - } - Ok(()) } pub(super) fn update_tree_meta_head>( db: *mut sqlite3, head: Option<&MaterializationHead>, ) -> Result<(), c_int> { - let (lamport, replica, counter, seq): (Lamport, &[u8], u64, u64) = match head { - Some(head) => ( - head.at.lamport, - head.at.replica.as_ref(), - head.at.counter, - head.seq, - ), - None => (0, &[], 0, 0), - }; - let sql = CString::new( + exec_stmt_done( + db, "UPDATE tree_meta \ SET head_lamport = ?1, \ head_replica = ?2, \ @@ -203,40 +257,133 @@ pub(super) fn update_tree_meta_head>( replay_replica = NULL, \ replay_counter = NULL \ WHERE id = 1", + |stmt| bind_optional_materialization_head(stmt, 1, head), ) - .expect("tree meta head sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, null_mut()); - if rc != SQLITE_OK as c_int { - return Err(rc); +} + +pub(super) fn persist_materialized_head>( + db: *mut sqlite3, + head: Option<&MaterializationHead>, +) -> Result<(), c_int> { + update_tree_meta_head(db, head)?; + maybe_save_materialization_checkpoint(db, head) +} + +pub(super) fn maybe_save_materialization_checkpoint>( + db: *mut sqlite3, + head: Option<&MaterializationHead>, +) -> Result<(), c_int> { + let Some(head) = head else { + return Ok(()); + }; + if !should_checkpoint_materialization(head) { + return Ok(()); } - let mut bind_err = false; - unsafe { - bind_err |= sqlite_bind_int64(stmt, 1, lamport as i64) != SQLITE_OK as c_int; - bind_err |= sqlite_bind_blob( - stmt, - 2, - replica.as_ptr() as *const c_void, - replica.len() as c_int, - None, - ) != SQLITE_OK as c_int; - bind_err |= sqlite_bind_int64(stmt, 3, counter as i64) != SQLITE_OK as c_int; - bind_err |= sqlite_bind_int64(stmt, 4, seq as i64) != SQLITE_OK as c_int; + let checkpoint_seq = head.seq as i64; + for sql in [ + "DELETE FROM checkpoint_oprefs_children WHERE checkpoint_seq = ?1", + "DELETE FROM checkpoint_payload WHERE checkpoint_seq = ?1", + "DELETE FROM checkpoint_nodes WHERE checkpoint_seq = ?1", + "DELETE FROM materialization_checkpoints WHERE checkpoint_seq = ?1", + ] { + exec_stmt_i64(db, sql, checkpoint_seq)?; } - if bind_err { - unsafe { sqlite_finalize(stmt) }; - return Err(SQLITE_ERROR as c_int); + + exec_stmt_done( + db, + "INSERT INTO materialization_checkpoints(checkpoint_seq, head_lamport, head_replica, head_counter) \ + VALUES (?1, ?2, ?3, ?4)", + |stmt| { + bind_i64_param(stmt, 1, checkpoint_seq)?; + bind_materialization_key(stmt, 2, &head.at) + }, + )?; + + for sql in [ + "INSERT INTO checkpoint_nodes(checkpoint_seq, node, parent, order_key, tombstone, last_change, deleted_at) \ + SELECT ?1, node, parent, order_key, tombstone, last_change, deleted_at FROM tree_nodes", + "INSERT INTO checkpoint_payload(checkpoint_seq, node, payload, last_lamport, last_replica, last_counter) \ + SELECT ?1, node, payload, last_lamport, last_replica, last_counter FROM tree_payload", + "INSERT INTO checkpoint_oprefs_children(checkpoint_seq, parent, op_ref, seq) \ + SELECT ?1, parent, op_ref, seq FROM oprefs_children", + ] { + exec_stmt_i64(db, sql, checkpoint_seq)?; } - let step_rc = unsafe { sqlite_step(stmt) }; - let finalize_rc = unsafe { sqlite_finalize(stmt) }; - if step_rc != SQLITE_DONE as c_int { - return Err(step_rc); + Ok(()) +} + +pub(super) fn load_materialization_checkpoint_before( + db: *mut sqlite3, + frontier: &treecrdt_core::MaterializationFrontier, +) -> Result, c_int> { + with_stmt( + db, + "SELECT checkpoint_seq, head_lamport, head_replica, head_counter \ + FROM materialization_checkpoints \ + WHERE head_lamport < ?1 \ + OR (head_lamport = ?1 AND head_replica < ?2) \ + OR (head_lamport = ?1 AND head_replica = ?2 AND head_counter < ?3) \ + ORDER BY head_lamport DESC, head_replica DESC, head_counter DESC \ + LIMIT 1", + |stmt| { + bind_materialization_key(stmt, 1, frontier)?; + + let step_rc = unsafe { sqlite_step(stmt) }; + if step_rc == SQLITE_DONE as c_int { + return Ok(None); + } + if step_rc != SQLITE_ROW as c_int { + return Err(step_rc); + } + + let checkpoint_seq = column_nonnegative_i64(stmt, 0) as u64; + let head_at = column_materialization_key(stmt, 1); + + Ok(Some(treecrdt_core::MaterializationHead { + at: head_at, + seq: checkpoint_seq, + })) + }, + ) +} + +pub(super) fn restore_materialization_checkpoint>( + db: *mut sqlite3, + checkpoint: Option<&MaterializationHead>, +) -> Result<(), c_int> { + for sql in [ + "DELETE FROM oprefs_children", + "DELETE FROM tree_payload", + "DELETE FROM tree_nodes", + ] { + exec_sql_text(db, sql)?; } - if finalize_rc != SQLITE_OK as c_int { - return Err(finalize_rc); + + if let Some(checkpoint) = checkpoint { + let checkpoint_seq = checkpoint.seq as i64; + for sql in [ + "INSERT INTO tree_nodes(node, parent, order_key, tombstone, last_change, deleted_at) \ + SELECT node, parent, order_key, tombstone, last_change, deleted_at \ + FROM checkpoint_nodes WHERE checkpoint_seq = ?1", + "INSERT INTO tree_payload(node, payload, last_lamport, last_replica, last_counter) \ + SELECT node, payload, last_lamport, last_replica, last_counter \ + FROM checkpoint_payload WHERE checkpoint_seq = ?1", + "INSERT INTO oprefs_children(parent, op_ref, seq) \ + SELECT parent, op_ref, seq \ + FROM checkpoint_oprefs_children WHERE checkpoint_seq = ?1", + ] { + exec_stmt_i64(db, sql, checkpoint_seq)?; + } + } else { + exec_stmt_done( + db, + "INSERT INTO tree_nodes(node,parent,order_key,tombstone) VALUES (?1,NULL,X'',0)", + |stmt| bind_blob_param(stmt, 1, &ROOT_NODE_ID), + )?; } + Ok(()) } @@ -306,50 +453,53 @@ CREATE TABLE IF NOT EXISTS tree_payload ( last_replica BLOB NOT NULL, last_counter INTEGER NOT NULL ); -"#; - let rc_meta = { - let sql = CString::new(META).expect("meta schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_meta != SQLITE_OK as c_int { - return Err(rc_meta); - } - let rc_ops = { - let sql = CString::new(OPS).expect("ops schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_ops != SQLITE_OK as c_int { - return Err(rc_ops); - } - let rc_tree_meta = { - let sql = CString::new(TREE_META).expect("tree_meta schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_tree_meta != SQLITE_OK as c_int { - return Err(rc_tree_meta); - } - let rc_nodes = { - let sql = CString::new(TREE_NODES).expect("tree_nodes schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_nodes != SQLITE_OK as c_int { - return Err(rc_nodes); - } - let rc_oprefs = { - let sql = CString::new(OPREFS_CHILDREN).expect("oprefs_children schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_oprefs != SQLITE_OK as c_int { - return Err(rc_oprefs); - } +CREATE TABLE IF NOT EXISTS materialization_checkpoints ( + checkpoint_seq INTEGER PRIMARY KEY, + head_lamport INTEGER NOT NULL, + head_replica BLOB NOT NULL, + head_counter INTEGER NOT NULL +); - let rc_tree_payload = { - let sql = CString::new(TREE_PAYLOAD).expect("tree_payload schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_tree_payload != SQLITE_OK as c_int { - return Err(rc_tree_payload); +CREATE TABLE IF NOT EXISTS checkpoint_nodes ( + checkpoint_seq INTEGER NOT NULL, + node BLOB NOT NULL, + parent BLOB, + order_key BLOB, + tombstone INTEGER NOT NULL DEFAULT 0, + last_change BLOB, + deleted_at BLOB, + PRIMARY KEY (checkpoint_seq, node) +); + +CREATE TABLE IF NOT EXISTS checkpoint_payload ( + checkpoint_seq INTEGER NOT NULL, + node BLOB NOT NULL, + payload BLOB, + last_lamport INTEGER NOT NULL, + last_replica BLOB NOT NULL, + last_counter INTEGER NOT NULL, + PRIMARY KEY (checkpoint_seq, node) +); + +CREATE TABLE IF NOT EXISTS checkpoint_oprefs_children ( + checkpoint_seq INTEGER NOT NULL, + parent BLOB NOT NULL, + op_ref BLOB NOT NULL, + seq INTEGER NOT NULL, + PRIMARY KEY (checkpoint_seq, parent, op_ref) +); +"#; + + for sql in [ + META, + OPS, + TREE_META, + TREE_NODES, + OPREFS_CHILDREN, + TREE_PAYLOAD, + ] { + exec_sql_text(db, sql)?; } const INDEXES: &str = r#" @@ -358,55 +508,32 @@ CREATE INDEX IF NOT EXISTS idx_ops_op_ref ON ops(op_ref); CREATE INDEX IF NOT EXISTS idx_tree_nodes_parent_order_key_node ON tree_nodes(parent, order_key, node); CREATE INDEX IF NOT EXISTS idx_tree_nodes_parent_tombstone_order_key_node ON tree_nodes(parent, tombstone, order_key, node); CREATE INDEX IF NOT EXISTS idx_oprefs_children_parent_seq ON oprefs_children(parent, seq); +CREATE INDEX IF NOT EXISTS idx_materialization_checkpoints_head + ON materialization_checkpoints(head_lamport, head_replica, head_counter); +CREATE INDEX IF NOT EXISTS idx_checkpoint_oprefs_children_parent_seq + ON checkpoint_oprefs_children(checkpoint_seq, parent, seq); "#; - let rc_idx = { - let sql = CString::new(INDEXES).expect("index schema"); - sqlite_exec(db, sql.as_ptr(), None, null_mut(), null_mut()) - }; - if rc_idx != SQLITE_OK as c_int { - return Err(rc_idx); - } + exec_sql_text(db, INDEXES)?; // If this is a fresh database with no ops yet, seed the materialized root so appends can // maintain state incrementally without a full catch-up pass. - let mut ops_count: i64 = 0; - { - let sql = CString::new("SELECT COUNT(*) FROM ops").expect("count ops sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, null_mut()); - if rc == SQLITE_OK as c_int { - let step_rc = unsafe { sqlite_step(stmt) }; - if step_rc == SQLITE_ROW as c_int { - ops_count = unsafe { sqlite_column_int64(stmt, 0) }; - } - unsafe { sqlite_finalize(stmt) }; + let ops_count = with_stmt(db, "SELECT COUNT(*) FROM ops", |stmt| { + let step_rc = unsafe { sqlite_step(stmt) }; + if step_rc == SQLITE_ROW as c_int { + Ok(column_nonnegative_i64(stmt, 0)) + } else if step_rc == SQLITE_DONE as c_int { + Ok(0) + } else { + Err(step_rc) } - } + })?; if ops_count == 0 { // Ensure ROOT exists even before first catch-up. - let _ = { - let sql = CString::new( - "INSERT OR IGNORE INTO tree_nodes(node,parent,order_key,tombstone) VALUES (?1,NULL,X'',0)", - ) - .expect("root insert sql"); - let mut stmt: *mut sqlite3_stmt = null_mut(); - let rc = sqlite_prepare_v2(db, sql.as_ptr(), -1, &mut stmt, null_mut()); - if rc != SQLITE_OK as c_int { - rc - } else { - unsafe { - sqlite_bind_blob( - stmt, - 1, - ROOT_NODE_ID.as_ptr() as *const c_void, - ROOT_NODE_ID.len() as c_int, - None, - ); - sqlite_step(stmt); - sqlite_finalize(stmt) - } - } - }; + let _ = exec_stmt_done( + db, + "INSERT OR IGNORE INTO tree_nodes(node,parent,order_key,tombstone) VALUES (?1,NULL,X'',0)", + |stmt| bind_blob_param(stmt, 1, &ROOT_NODE_ID), + ); } Ok(()) diff --git a/packages/treecrdt-sqlite-ext/src/extension/functions/util.rs b/packages/treecrdt-sqlite-ext/src/extension/functions/util.rs index 36a55405..e9614142 100644 --- a/packages/treecrdt-sqlite-ext/src/extension/functions/util.rs +++ b/packages/treecrdt-sqlite-ext/src/extension/functions/util.rs @@ -87,6 +87,25 @@ pub(super) fn read_text(val: *mut sqlite3_value) -> String { } } +pub(super) fn column_blob_vec(stmt: *mut sqlite3_stmt, idx: c_int) -> Option> { + unsafe { + if sqlite_column_type(stmt, idx) == SQLITE_NULL as c_int { + return None; + } + let ptr = sqlite_column_blob(stmt, idx) as *const u8; + let len = sqlite_column_bytes(stmt, idx) as usize; + Some(if ptr.is_null() || len == 0 { + Vec::new() + } else { + slice::from_raw_parts(ptr, len).to_vec() + }) + } +} + +pub(super) fn column_nonnegative_i64(stmt: *mut sqlite3_stmt, idx: c_int) -> i64 { + unsafe { sqlite_column_int64(stmt, idx).max(0) } +} + pub(super) fn sqlite_err_from_core(_: treecrdt_core::Error) -> c_int { SQLITE_ERROR as c_int } diff --git a/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs b/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs index c9594b27..ef9ae60a 100644 --- a/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs +++ b/packages/treecrdt-sqlite-ext/tests/extension_roundtrip.rs @@ -38,6 +38,14 @@ fn read_replay_frontier(conn: &Connection) -> (Option, Option>, Opt .unwrap() } +fn read_checkpoint_seqs(conn: &Connection) -> Vec { + let mut stmt = conn + .prepare("SELECT checkpoint_seq FROM materialization_checkpoints ORDER BY checkpoint_seq") + .unwrap(); + let rows = stmt.query_map([], |row| row.get::<_, i64>(0)).unwrap(); + rows.map(|row| row.unwrap()).collect() +} + fn append_ops_json(conn: &Connection, ops: &[JsonOp]) -> (Vec>, i64) { let json = serde_json::to_string(ops).unwrap(); let affected_json: String = conn @@ -352,6 +360,56 @@ fn remote_append_materializes_only_inserted_ops() { assert_eq!(head_seq, 2); } +#[test] +fn remote_append_persists_periodic_materialization_checkpoints() { + let conn = setup_conn(); + + let replica = b"ckpt"; + let root = node_bytes(0); + let first = JsonOp { + replica: replica.to_vec(), + counter: 1, + lamport: 1, + kind: "insert".into(), + parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), + node: <[u8; 16]>::try_from(node_bytes(1).as_slice()).unwrap(), + new_parent: None, + order_key: Some((1u16).to_be_bytes().to_vec()), + known_state: None, + payload: None, + }; + let (_affected, count) = append_ops_json(&conn, std::slice::from_ref(&first)); + assert_eq!(count, 1); + + let ops: Vec = (1..64u16) + .map(|i| JsonOp { + replica: replica.to_vec(), + counter: (i as u64) + 1, + lamport: (i as u64) + 1, + kind: "insert".into(), + parent: Some(<[u8; 16]>::try_from(root.as_slice()).unwrap()), + node: <[u8; 16]>::try_from(node_bytes((i as u128) + 1).as_slice()).unwrap(), + new_parent: None, + order_key: Some(((i + 1).to_be_bytes()).to_vec()), + known_state: None, + payload: None, + }) + .collect(); + + let (_affected, count) = append_ops_json(&conn, &ops); + assert_eq!(count, 64); + assert_eq!(read_checkpoint_seqs(&conn), vec![1, 64]); + + let checkpoint_node_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM checkpoint_nodes WHERE checkpoint_seq = 64", + [], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(checkpoint_node_count, 65); +} + #[test] fn remote_append_representative_batch_matches_postgres_shape() { let conn = setup_conn();