Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions examples/playground/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ export default function App() {
persistSyncSettings(syncServerUrl, syncTransportMode);
}, [syncServerUrl, syncTransportMode]);

const counterRef = useRef(0);
const lamportRef = useRef(0);
const initEpochRef = useRef(0);
const disposedRef = useRef(false);
Expand Down Expand Up @@ -483,18 +482,14 @@ export default function App() {
const active = nextClient ?? clientRef.current ?? client;
if (!active) return;
try {
const [lamport, counter] = await Promise.all([
active.meta.headLamport(),
replica ? active.meta.replicaMaxCounter(replica) : Promise.resolve(0),
]);
const lamport = await active.meta.headLamport();
lamportRef.current = Math.max(lamportRef.current, lamport);
setHeadLamport(lamportRef.current);
counterRef.current = Math.max(counterRef.current, counter);
} catch (err) {
console.error("Failed to refresh meta", err);
}
},
[client, replica]
[client]
);

const refreshParentsScheduledRef = useRef(false);
Expand Down Expand Up @@ -587,6 +582,7 @@ export default function App() {

const {
peers,
syncTargetCount,
remoteSyncStatus,
syncBusy,
liveBusy,
Expand Down Expand Up @@ -624,7 +620,6 @@ export default function App() {
revocationCutoverTokenId,
revocationCutoverCounter,
treeStateRef,
refreshMeta,
refreshParents,
refreshNodeCount,
getLocalIdentityChain,
Expand Down Expand Up @@ -831,7 +826,6 @@ export default function App() {
setPayloadVersion((v) => v + 1);
knownOpsRef.current = new Set();
setCollapse({ defaultCollapsed: true, overrides: new Set([ROOT_ID]) });
counterRef.current = 0;
lamportRef.current = 0;
setHeadLamport(0);
setTotalNodes(null);
Expand Down Expand Up @@ -887,7 +881,6 @@ export default function App() {
await verifyLocalOps([op]);

lamportRef.current = Math.max(lamportRef.current, op.meta.lamport);
counterRef.current = Math.max(counterRef.current, op.meta.id.counter);
setHeadLamport(lamportRef.current);

notifyLocalUpdate([op]);
Expand Down Expand Up @@ -916,7 +909,6 @@ export default function App() {
scheduleRefreshParents(parentsAffectedByOps(stateBefore, [op]));
scheduleRefreshNodeCount();
lamportRef.current = Math.max(lamportRef.current, op.meta.lamport);
counterRef.current = Math.max(counterRef.current, op.meta.id.counter);
setHeadLamport(lamportRef.current);
} catch (err) {
console.error("Failed to append move op", err);
Expand Down Expand Up @@ -1014,7 +1006,6 @@ export default function App() {

for (const op of ops) {
lamportRef.current = Math.max(lamportRef.current, op.meta.lamport);
counterRef.current = Math.max(counterRef.current, op.meta.id.counter);
}
setHeadLamport(lamportRef.current);

Expand Down Expand Up @@ -1067,7 +1058,6 @@ export default function App() {
await ensureChildrenLoaded(parentId, { force: true });
}
lamportRef.current = Math.max(lamportRef.current, op.meta.lamport);
counterRef.current = Math.max(counterRef.current, op.meta.id.counter);
setHeadLamport(lamportRef.current);
setCollapse((prev) => {
const overrides = new Set(prev.overrides);
Expand Down Expand Up @@ -1294,7 +1284,7 @@ export default function App() {
busy={busy}
syncBusy={syncBusy}
liveBusy={liveBusy}
peerCount={peers.length}
peerCount={syncTargetCount}
authCanSyncAll={authCanSyncAll}
onSync={() => void (authCanSyncAll ? handleSync({ all: {} }) : handleScopedSync())}
liveAllEnabled={liveAllEnabled}
Expand Down Expand Up @@ -1417,7 +1407,7 @@ export default function App() {
onSync={() => {
void (authCanSyncAll ? handleSync({ all: {} }) : handleScopedSync());
}}
canSync={status === "ready" && !busy && !syncBusy && peers.length > 0 && online}
canSync={status === "ready" && !busy && !syncBusy && syncTargetCount > 0 && online}
onDetails={() => setShowAuthPanel(true)}
/>
</div>
Expand Down
35 changes: 34 additions & 1 deletion examples/playground/src/playground/hooks/usePlaygroundAuth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,38 @@ const ALLOWED_GRANT_ACTIONS = new Set([
"read_payload",
]);

const SYNC_AUTH_PREFLIGHT_RETRIES = 12;
const SYNC_AUTH_PREFLIGHT_RETRY_DELAY_MS = 250;

function delay(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}

async function waitForSyncAuthPreflight(
auth: SyncAuth<Operation>,
docId: string,
opts: { attempts?: number; delayMs?: number } = {},
): Promise<void> {
const attempts = Math.max(1, opts.attempts ?? SYNC_AUTH_PREFLIGHT_RETRIES);
const delayMs = Math.max(0, opts.delayMs ?? SYNC_AUTH_PREFLIGHT_RETRY_DELAY_MS);
let lastErr: unknown = null;

for (let attempt = 1; attempt <= attempts; attempt += 1) {
try {
await auth.helloCapabilities?.({ docId });
return;
} catch (err) {
lastErr = err;
if (attempt === attempts) break;
await delay(delayMs);
}
}

throw lastErr ?? new Error("sync auth preflight failed");
}

function normalizeGrantActions(input: string[]): string[] {
const out: string[] = [];
for (const raw of input) {
Expand Down Expand Up @@ -633,8 +665,9 @@ export function usePlaygroundAuth(opts: UsePlaygroundAuthOptions): PlaygroundAut

void (async () => {
try {
await preparedAuth.helloCapabilities?.({ docId });
await waitForSyncAuthPreflight(preparedAuth, docId);
if (cancelled) return;
setAuthError(null);
setSyncAuth(preparedAuth);
} catch (err) {
if (cancelled) return;
Expand Down
35 changes: 18 additions & 17 deletions examples/playground/src/playground/hooks/usePlaygroundSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ function formatRemoteErrorDetail(
}
export type PlaygroundSyncApi = {
peers: PeerInfo[];
syncTargetCount: number;
remoteSyncStatus: RemoteSyncStatus;
syncBusy: boolean;
liveBusy: boolean;
Expand Down Expand Up @@ -195,9 +196,8 @@ export type UsePlaygroundSyncOptions = {
revocationCutoverTokenId: string;
revocationCutoverCounter: string;
treeStateRef: React.MutableRefObject<TreeState>;
refreshMeta: () => Promise<void>;
refreshParents: (parentIds: string[]) => Promise<void>;
refreshNodeCount: () => Promise<void>;
refreshParents: (parentIds: Iterable<string>) => Promise<void> | void;
refreshNodeCount: () => Promise<void> | void;
getLocalIdentityChain: () => Promise<TreecrdtIdentityChainV1 | null>;
onPeerIdentityChain: (chain: {
identityPublicKey: Uint8Array;
Expand Down Expand Up @@ -227,7 +227,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
authCanSyncAll,
viewRootId,
treeStateRef,
refreshMeta,
refreshParents,
refreshNodeCount,
onAuthGrantMessage,
Expand All @@ -238,6 +237,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
const [liveBusy, setLiveBusy] = useState(false);
const [syncError, setSyncError] = useState<string | null>(null);
const [peers, setPeers] = useState<PeerInfo[]>([]);
const [syncTargetCount, setSyncTargetCount] = useState(0);
const [remoteSyncStatus, setRemoteSyncStatus] = useState<RemoteSyncStatus>({
state: 'disabled',
detail: 'Remote server transport is disabled in local tabs mode.',
Expand Down Expand Up @@ -278,6 +278,12 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
const meshPeersRef = useRef<PeerInfo[]>([]);
const remotePeerRef = useRef<PeerInfo | null>(null);

const publishSyncTargetCount = (
connections: Map<string, { transport: DuplexTransport<any>; detach: () => void }> = syncConnRef.current,
) => {
setSyncTargetCount(connections.size);
};

const publishPeers = () => {
const merged: PeerInfo[] = [...meshPeersRef.current];
if (remotePeerRef.current) merged.push(remotePeerRef.current);
Expand Down Expand Up @@ -568,6 +574,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
// ignore
}
connections.delete(peerId);
publishSyncTargetCount(connections);
stopLiveAllForPeer(peerId);
stopLiveChildrenForPeer(peerId);

Expand Down Expand Up @@ -629,7 +636,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
if (lastErr) throw lastErr;
throw new Error('No peers responded to sync.');
}
await refreshMeta();
await refreshParents(Object.keys(treeStateRef.current.childrenByParent));
await refreshNodeCount();
} catch (err) {
Expand Down Expand Up @@ -702,7 +708,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
if (lastErr) throw lastErr;
throw new Error('No peers responded to sync.');
}
await refreshMeta();
await refreshParents(Object.keys(treeStateRef.current.childrenByParent));
await refreshNodeCount();
} catch (err) {
Expand Down Expand Up @@ -787,12 +792,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
);
}

await refreshMeta();
const parentIds = new Set(Object.keys(treeStateRef.current.childrenByParent));
parentIds.add(viewRootId);
await refreshParents(Array.from(parentIds));
await refreshNodeCount();

autoSyncDoneRef.current = true;
if (typeof window !== 'undefined') {
const url = new URL(window.location.href);
Expand All @@ -819,9 +818,6 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
authMaterial.localTokensB64.length,
autoSyncJoinTick,
joinMode,
refreshMeta,
refreshNodeCount,
refreshParents,
syncBusy,
viewRootId,
]);
Expand Down Expand Up @@ -981,8 +977,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
if (debugSync && ops.length > 0) {
console.debug(`[sync:${selfPeerId}] applyOps(${ops.length})`);
}
const affected =
ops.length > 0 ? ((await client.ops.appendMany(ops)) as unknown as string[]) : [];
const affected = ops.length > 0 ? await client.ops.appendMany(ops) : [];
await onRemoteOpsApplied(ops, affected);
},
};
Expand All @@ -1002,6 +997,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn

const connections = new Map<string, { transport: DuplexTransport<any>; detach: () => void }>();
syncConnRef.current = connections;
publishSyncTargetCount(connections);

const maybeStartLiveForPeer = (peerId: string) => {
if (!isRemotePeerId(peerId)) {
Expand Down Expand Up @@ -1034,6 +1030,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
onPeerTransport: (peerId, transport) => {
const detach = sharedPeer.attach(transport);
connections.set(peerId, { transport, detach });
publishSyncTargetCount(connections);
maybeStartLiveForPeer(peerId);
if (autoSyncJoinInitial && joinMode && !autoSyncDoneRef.current) {
autoSyncPeerIdRef.current = peerId;
Expand All @@ -1043,6 +1040,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
},
onPeerDisconnected: (peerId) => {
connections.delete(peerId);
publishSyncTargetCount(connections);
stopLiveAllForPeer(peerId);
stopLiveChildrenForPeer(peerId);
meshPeersRef.current = meshPeersRef.current.filter((p) => p.id !== peerId);
Expand Down Expand Up @@ -1127,6 +1125,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
);
const detach = sharedPeer.attach(transport);
syncConnRef.current.set(remotePeerId, { transport, detach });
publishSyncTargetCount();
remotePeerRef.current = { id: remotePeerId, lastSeen: Date.now() };
publishPeers();
maybeStartLiveForPeer(remotePeerId);
Expand Down Expand Up @@ -1220,6 +1219,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn
liveBusyCountRef.current = 0;
setLiveBusy(false);
connections.clear();
publishSyncTargetCount(connections);
meshPeersRef.current = [];
remotePeerRef.current = null;
publishPeers();
Expand All @@ -1243,6 +1243,7 @@ export function usePlaygroundSync(opts: UsePlaygroundSyncOptions): PlaygroundSyn

return {
peers,
syncTargetCount,
remoteSyncStatus,
syncBusy,
liveBusy,
Expand Down
5 changes: 3 additions & 2 deletions packages/treecrdt-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ pub use ids::{Lamport, NodeId, OperationId, ReplicaId};
pub use materialization::{
apply_incremental_ops_with_delta, apply_persisted_remote_ops_with_delta,
catch_up_materialized_state, materialize_persisted_remote_ops_with_delta,
IncrementalApplyResult, MaterializationCursor, MaterializationFrontier, MaterializationHead,
MaterializationKey, MaterializationState, PersistedRemoteApplyResult, PersistedRemoteStores,
should_checkpoint_materialization, IncrementalApplyResult, MaterializationCursor,
MaterializationFrontier, MaterializationHead, MaterializationKey, MaterializationState,
PersistedRemoteApplyResult, PersistedRemoteStores, MATERIALIZATION_CHECKPOINT_INTERVAL,
};
pub use ops::{cmp_op_key, cmp_ops, Operation, OperationKind, OperationMetadata};
pub use traits::{
Expand Down
Loading
Loading