diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f56287ea9668..03358611ae60 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1793,6 +1793,11 @@ export class ContainerRuntime reSubmitBatch: this.reSubmitBatch.bind(this), isActiveConnection: () => this.innerDeltaManager.active, isAttached: () => this.attachState !== AttachState.Detached, + setIdAllocationBatchId: (batchId: string) => { + if (this.batchIdTrackingEnabled) { + this.outbox.setIdAllocationBatchId(batchId); + } + }, }, pendingRuntimeState?.pending, this.baseLogger, diff --git a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts index de7790506585..40e27adb7859 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/batchManager.ts @@ -26,11 +26,6 @@ export interface IBatchManagerOptions { * If true, the outbox is allowed to rebase the batch during flushing. */ readonly canRebase: boolean; - - /** - * If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored - */ - readonly ignoreBatchId?: boolean; } export interface BatchSequenceNumbers { diff --git a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts index 102580b2baa1..0a63d7c192b1 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts @@ -215,10 +215,20 @@ export class Outbox { this.blobAttachBatch = new BatchManager({ canRebase: true }); this.idAllocationBatch = new BatchManager({ canRebase: false, - ignoreBatchId: true, }); } + /** + * Derived batchId to stamp on the next ID allocation batch flush (during resubmit). + * Set by PendingStateManager's pre-scan phase before replay, consumed (one-shot) by flushAll. + */ + private idAllocationResubmitBatchId: string | undefined; + + /** Set the derived batchId for the ID allocation batch before replay flushes it. */ + public setIdAllocationBatchId(batchId: string): void { + this.idAllocationResubmitBatchId = batchId; + } + public get messageCount(): number { return this.mainBatch.length + this.blobAttachBatch.length + this.idAllocationBatch.length; } @@ -397,12 +407,16 @@ export class Outbox { return; } - // Don't use resubmittingBatchId for idAllocationBatch. - // ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo + // Use the derived batchId for the ID allocation batch if one was set by PSM's pre-scan. + // This enables fork detection via DuplicateBatchDetector for ID allocation batches. + const idAllocResubmitInfo: BatchResubmitInfo | undefined = + this.idAllocationResubmitBatchId === undefined + ? undefined + : { batchId: this.idAllocationResubmitBatchId, staged: false }; + this.idAllocationResubmitBatchId = undefined; // Consume (one-shot) this.flushInternal({ batchManager: this.idAllocationBatch, - // Note: For now, we will never stage ID Allocation messages. - // They won't contain personal info and no harm in extra allocations in case of discarding the staged changes + resubmitInfo: idAllocResubmitInfo, }); this.flushInternal({ batchManager: this.blobAttachBatch, @@ -499,7 +513,6 @@ export class Outbox { rawBatch.messages, clientSequenceNumber, staged, - batchManager.options.ignoreBatchId, ); } diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index f1edd45d9284..8e16947c0fcc 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -16,6 +16,7 @@ import Deque from "double-ended-queue"; import { v4 as uuid } from "uuid"; import { isContainerMessageDirtyable } from "./containerRuntime.js"; +import { ContainerMessageType } from "./messageTypes.js"; import type { InboundContainerRuntimeMessage, InboundSequencedContainerRuntimeMessage, @@ -84,10 +85,6 @@ export interface IPendingMessage { * length of the batch (how many runtime messages here) */ length: number; - /** - * If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored - */ - ignoreBatchId?: boolean; /** * If true, this batch is staged and should not actually be submitted on replayPendingStates. */ @@ -144,6 +141,8 @@ export interface IRuntimeStateHandler { ): void; isActiveConnection: () => boolean; isAttached: () => boolean; + /** Set the derived batchId for the ID allocation batch before replay flushes it. */ + setIdAllocationBatchId(batchId: string): void; } function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean { @@ -403,13 +402,11 @@ export class PendingStateManager implements IDisposable { * @param clientSequenceNumber - The CSN of the first message in the batch, * or undefined if the batch was not yet sent (e.g. by the time we flushed we lost the connection) * @param staged - Indicates whether batch is staged (not to be submitted while runtime is in Staging Mode) - * @param ignoreBatchId - Whether to ignore the batchId in the batchStartInfo */ public onFlushBatch( batch: LocalBatchMessage[] | [LocalEmptyBatchPlaceholder], clientSequenceNumber: number | undefined, staged: boolean, - ignoreBatchId?: boolean, ): void { // clientId and batchStartCsn are used for generating the batchId so we can detect container forks // where this batch was submitted by two different clients rehydrating from the same local state. @@ -443,7 +440,7 @@ export class PendingStateManager implements IDisposable { localOpMetadata, opMetadata, // Note: We only will read this off the first message, but put it on all for simplicity - batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }, + batchInfo: { clientId, batchStartCsn, length: batch.length, staged }, }; this.pendingMessages.push(pendingMessage); } @@ -502,6 +499,72 @@ export class PendingStateManager implements IDisposable { } } + /** + * Scan pending messages to derive a deterministic batchId for any ID Allocation batch, + * and set it on the Outbox so it will be used when the ID allocation batch is flushed + * during replay. This enables fork detection via DuplicateBatchDetector for ID allocation + * batches that get resubmitted after rehydration. + * + * @param messageCount - Number of pending messages to scan. Defaults to all. + * @param skipNonStaged - If true, skip non-staged messages (for committingStagedBatches mode). + */ + private deriveIdAllocationBatchId(messageCount?: number, skipNonStaged?: boolean): void { + const count = messageCount ?? this.pendingMessages.length; + let scanIndex = 0; + const dataBatchIds: string[] = []; + let hasIdAllocBatch = false; + let idAllocBatchInfo: { clientId: string; batchStartCsn: number } | undefined; + + while (scanIndex < count) { + const msg = this.pendingMessages.get(scanIndex); + assert(msg !== undefined, 0xc03 /* expected pending message at scan index */); + + // In committingStagedBatches mode, skip non-staged messages (they won't be replayed) + if (skipNonStaged === true && !msg.batchInfo.staged) { + scanIndex++; + continue; + } + + const isIdAlloc = + hasTypicalRuntimeOp(msg) && msg.runtimeOp.type === ContainerMessageType.IdAllocation; + + if (isIdAlloc) { + hasIdAllocBatch = true; + idAllocBatchInfo = { + clientId: msg.batchInfo.clientId, + batchStartCsn: msg.batchInfo.batchStartCsn, + }; + } else { + // Data batch (including empty batches) — collect its batchId + dataBatchIds.push(getEffectiveBatchId(msg)); + } + + // Advance past the current batch (could be multi-message) + scanIndex = nextBatchIndex(this.pendingMessages, scanIndex, count); + } + + if (!hasIdAllocBatch) { + return; + } + + let derivedIdAllocBatchId: string; + if (dataBatchIds.length > 0) { + // Derive from the first data batch's ID (deterministic — both forks see same stashed state) + derivedIdAllocBatchId = `idAlloc_[${dataBatchIds[0]}]`; + } else { + // Edge case: Only ID alloc ops exist (no data batches). + // Derive from the ID alloc pending message's own batchInfo. + assert( + idAllocBatchInfo !== undefined, + 0xc04 /* idAllocBatchInfo must be set when hasIdAllocBatch is true */, + ); + derivedIdAllocBatchId = `idAlloc_[${idAllocBatchInfo.clientId}_${idAllocBatchInfo.batchStartCsn}]`; + } + + // Set the derived batchId on the Outbox for when the ID allocation batch is flushed during replay. + this.stateHandler.setIdAllocationBatchId(derivedIdAllocBatchId); + } + /** * Compares the batch ID of the incoming batch with the pending batch ID for this client. * They should not match, as that would indicate a forked container. @@ -509,26 +572,52 @@ export class PendingStateManager implements IDisposable { * @returns whether the batch IDs match */ private remoteBatchMatchesPendingBatch(remoteBatchStart: BatchStartInfo): boolean { - // Find the first pending message that uses Batch ID, to compare to the incoming remote batch. - // If there is no such message, then the incoming remote batch doesn't have a match here and we can return. - const firstIndexUsingBatchId = Array.from({ - length: this.pendingMessages.length, - }).findIndex((_, i) => this.pendingMessages.get(i)?.batchInfo.ignoreBatchId !== true); - const pendingMessageUsingBatchId = - firstIndexUsingBatchId === -1 - ? undefined - : this.pendingMessages.get(firstIndexUsingBatchId); - - if (pendingMessageUsingBatchId === undefined) { - return false; - } - - // We must compare the effective batch IDs, since one of these ops - // may have been the original, not resubmitted, so wouldn't have its batch ID stamped yet. - const pendingBatchId = getEffectiveBatchId(pendingMessageUsingBatchId); const inboundBatchId = getEffectiveBatchId(remoteBatchStart); - return pendingBatchId === inboundBatchId; + // Check the first pending batch, and if it's an ID allocation batch that doesn't + // match, also check the next batch. We look beyond the first because the pending + // queue may start with a stashed ID allocation batch whose effective batchId differs + // from the inbound batch, while the following data batch does match. + // ID alloc is always flushed first by the Outbox, so it can only be at position 0. + let scanIndex = 0; + // maxBatches: 1 normally, 2 if the first batch is an unmatched ID alloc + const maxBatches = 2; + for ( + let batchesSeen = 0; + batchesSeen < maxBatches && scanIndex < this.pendingMessages.length; + batchesSeen++ + ) { + const msg = this.pendingMessages.get(scanIndex); + if (msg === undefined) { + break; + } + + const pendingBatchId = getEffectiveBatchId(msg); + if (pendingBatchId === inboundBatchId) { + return true; + } + + // Only continue scanning past the first batch if it was an ID allocation batch. + // Any other batch type at position 0 means there's no ID alloc to skip over. + if ( + batchesSeen === 0 && + hasTypicalRuntimeOp(msg) && + msg.runtimeOp.type === ContainerMessageType.IdAllocation + ) { + // Advance past this ID alloc batch (could be multi-message) and check one more + scanIndex = nextBatchIndex( + this.pendingMessages, + scanIndex, + this.pendingMessages.length, + ); + continue; + } + + // First batch wasn't an ID alloc — no need to check further + break; + } + + return false; } /** @@ -776,11 +865,20 @@ export class PendingStateManager implements IDisposable { const initialPendingMessagesCount = this.pendingMessages.length; let remainingPendingMessagesCount = this.pendingMessages.length; + // === Phase 1: Pre-scan === + // Scan pending messages to derive a deterministic batchId for any ID Allocation batch. + // This must happen before replay because the ID alloc batch will be flushed during replay + // and needs its batchId set on the Outbox beforehand. + this.deriveIdAllocationBatchId(initialPendingMessagesCount, committingStagedBatches); + let seenStagedBatch = false; - // Process exactly `pendingMessagesCount` items in the queue as it represents the number of messages that were - // pending when we connected. This is important because the `reSubmitFn` might add more items in the queue - // which must not be replayed. + // === Phase 2: Dequeue and replay === + // Process exactly `initialPendingMessagesCount` items in the queue as it represents the + // number of messages that were pending when we connected. This is important because the + // `reSubmitFn` might add more items in the queue which must not be replayed. + // ID Allocation batches are skipped here — fresh ID alloc ops were already submitted + // by submitIdAllocationOpIfNeeded before replay started. while (remainingPendingMessagesCount > 0) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion let pendingMessage = this.pendingMessages.shift()!; @@ -801,12 +899,26 @@ export class PendingStateManager implements IDisposable { const batchMetadataFlag = asBatchMetadata(pendingMessage.opMetadata)?.batch; assert(batchMetadataFlag !== false, 0x41b /* We cannot process batches in chunks */); - // The next message starts a batch (possibly single-message), and we'll need its batchId. - const batchId = - pendingMessage.batchInfo.ignoreBatchId === true - ? undefined - : getEffectiveBatchId(pendingMessage); + // Skip ID Allocation batches (handled by submitIdAllocationOpIfNeeded before replay). + if ( + hasTypicalRuntimeOp(pendingMessage) && + pendingMessage.runtimeOp.type === ContainerMessageType.IdAllocation + ) { + // If it's a multi-message ID alloc batch, consume the rest of it + if (batchMetadataFlag === true) { + while (remainingPendingMessagesCount > 0) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const innerMsg = this.pendingMessages.shift()!; + remainingPendingMessagesCount--; + if (innerMsg.opMetadata?.batch === false) { + break; + } + } + } + continue; + } + const batchId = getEffectiveBatchId(pendingMessage); const staged = pendingMessage.batchInfo.staged; if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) { @@ -945,3 +1057,32 @@ function hasTypicalRuntimeOp( ): message is IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage } { return message.runtimeOp !== undefined && message.runtimeOp.type !== "groupedBatch"; } + +/** + * Given the first message of a batch at `startIndex` in the deque, return the index of + * the first message of the *next* batch. Handles both single-message batches (no batch + * metadata flag, or flag === undefined) and multi-message batches (flag === true on the + * first message, flag === false on the last). + */ +function nextBatchIndex( + messages: Deque, + startIndex: number, + limit: number, +): number { + const msg = messages.get(startIndex); + const batchFlag = asBatchMetadata(msg?.opMetadata)?.batch; + if (batchFlag !== true) { + // Single-message batch (or no batch metadata) + return startIndex + 1; + } + // Multi-message batch — scan forward to find the batch-end marker + let idx = startIndex + 1; + while (idx < limit) { + const inner = messages.get(idx); + idx++; + if (inner?.opMetadata?.batch === false) { + break; + } + } + return idx; +} diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts index fe23ce479133..982d8afb25da 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/duplicateBatchDetector.spec.ts @@ -193,6 +193,36 @@ describe("DuplicateBatchDetector", () => { ); }); + it("ID Allocation batches with derived batchIds are detected as duplicates", () => { + // Simulates two forked containers submitting ID allocation batches with the same derived batchId + const derivedIdAllocBatchId = "idAlloc_[clientId_[2]]"; + + const inboundBatch1 = makeBatch({ + sequenceNumber: seqNum++, + minimumSequenceNumber: 0, + batchId: derivedIdAllocBatchId, + }); + const inboundBatch2 = makeBatch({ + sequenceNumber: seqNum++, + minimumSequenceNumber: 0, + batchId: derivedIdAllocBatchId, + }); + + const result1 = detector.processInboundBatch(inboundBatch1); + assert.deepEqual( + result1, + { duplicate: false }, + "First ID alloc batch should not be a duplicate", + ); + + const result2 = detector.processInboundBatch(inboundBatch2); + assert.deepEqual( + result2, + { duplicate: true, otherSequenceNumber: 1 }, + "Second ID alloc batch with same derived batchId should be detected as a duplicate", + ); + }); + describe("getStateForSummary", () => { it("If empty, return undefined", () => { assert.equal( diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 9209d504209b..088f4b87f304 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -478,6 +478,85 @@ describe("Outbox", () => { ); }); + it("ID Allocation batch gets derived batchId when setIdAllocationBatchId is called before flush", () => { + const outbox = getOutbox({ + context: getMockContext(), + opGroupingConfig: { + groupedBatchingEnabled: false, + }, + }); + // Set a derived batchId for the ID allocation batch (simulates PSM pre-scan) + outbox.setIdAllocationBatchId("idAlloc_[clientId_[2]]"); + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); + outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1")); + outbox.flush({ batchId: "batchId-A", staged: false }); + + assert.deepEqual( + state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)), + [ + ["idAlloc_[clientId_[2]]"], // ID Allocation batch now has the derived batchId + ["batchId-A"], // Main batch + ], + "ID Allocation batch should have the derived batchId", + ); + + assert.deepEqual( + state.pendingOpContents.map(({ opMetadata }) => asBatchMetadata(opMetadata)?.batchId), + [ + "idAlloc_[clientId_[2]]", // ID Allocation batch has derived batchId + "batchId-A", // Main batch + ], + "Pending messages should reflect the derived batchId for ID allocation", + ); + }); + + it("ID Allocation batch has no batchId when setIdAllocationBatchId is not called (non-resubmit)", () => { + const outbox = getOutbox({ + context: getMockContext(), + opGroupingConfig: { + groupedBatchingEnabled: false, + }, + }); + // Do NOT call setIdAllocationBatchId — simulates normal (non-resubmit) flush + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); + outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1")); + outbox.flush(); + + assert.deepEqual( + state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)), + [ + [undefined], // ID Allocation batch — no batchId (not resubmit) + [undefined], // Main batch — no batchId (not resubmit) + ], + "Neither batch should have a batchId on non-resubmit flush", + ); + }); + + it("setIdAllocationBatchId is consumed (one-shot) — only first flush uses it", () => { + const outbox = getOutbox({ + context: getMockContext(), + opGroupingConfig: { + groupedBatchingEnabled: false, + }, + }); + outbox.setIdAllocationBatchId("idAlloc_[derived]"); + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); + outbox.flush({ batchId: "batchId-A", staged: false }); + + // Second flush — the derived batchId should have been consumed + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "1")); + outbox.flush(); + + assert.deepEqual( + state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)), + [ + ["idAlloc_[derived]"], // First flush — has derived batchId + [undefined], // Second flush — consumed, no batchId + ], + "Derived batchId should only be used for the first flush", + ); + }); + it("Will send messages only when allowed, but will store them in the pending state", () => { const outbox = getOutbox({ context: getMockContext() }); const messages = [ diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 2c3b3e4df6df..a0937c6ed85b 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -96,6 +96,7 @@ describe("Pending State Manager", () => { reSubmitBatch: sandbox.stub(), isActiveConnection: sandbox.stub(), isAttached: sandbox.stub(), + setIdAllocationBatchId: sandbox.stub(), }; stubs.applyStashedOp.resolves(undefined); stubs.clientId.returns("clientId"); @@ -231,6 +232,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, undefined /* initialLocalState */, logger, @@ -720,6 +722,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, pendingStates ? { pendingStates } : undefined, logger, @@ -791,6 +794,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, { pendingStates: messages }, logger, @@ -809,6 +813,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, undefined /* initialLocalState */, logger, @@ -826,6 +831,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, localStateWithEmptyBatch, logger, @@ -867,6 +873,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, pendingStates ? { pendingStates } : undefined, logger, @@ -1013,6 +1020,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, pendingStates ? { pendingStates } : undefined /* initialLocalState */, logger, @@ -1087,6 +1095,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, { pendingStates: initialMessages }, logger, @@ -1461,6 +1470,208 @@ describe("Pending State Manager", () => { "Squash flag should be set to true", ); }); + + it("should skip ID Allocation pending messages during replay", () => { + const stubs = getStateHandlerStub(); + const pendingStateManager = newPendingStateManager(stubs); + + // Simulate the resubmit behavior: each reSubmitBatch call re-flushes back into PSM + stubs.reSubmitBatch.callsFake((batch, metadata) => { + pendingStateManager.onFlushBatch( + batch.map(({ runtimeOp, opMetadata, localOpMetadata }) => ({ + runtimeOp, + referenceSequenceNumber: 15, + metadata: opMetadata, + localOpMetadata, + })), + /* clientSequenceNumber: */ 1, + /* staged: */ metadata.staged, + ); + }); + + // Enqueue an ID Allocation pending message + pendingStateManager.onFlushBatch( + [ + { + runtimeOp: { + type: ContainerMessageType.IdAllocation, + contents: { first: 0, count: 5 }, + } as unknown as LocalContainerRuntimeMessage, + referenceSequenceNumber: 10, + metadata: undefined, + localOpMetadata: "ID_ALLOC", + }, + ], + /* clientSequenceNumber: */ 1, + /* staged: */ false, + ); + // Enqueue a data batch + pendingStateManager.onFlushBatch( + [ + { + runtimeOp: { + type: ContainerMessageType.FluidDataStoreOp, + contents: testAddressedDataStoreMessage, + }, + referenceSequenceNumber: 10, + metadata: undefined, + localOpMetadata: "DATA_OP", + }, + ], + /* clientSequenceNumber: */ 2, + /* staged: */ false, + ); + + pendingStateManager.replayPendingStates(); + + // reSubmitBatch should only be called once (for the data batch, not the ID alloc batch) + assert.strictEqual( + stubs.reSubmitBatch.callCount, + 1, + "reSubmitBatch should only be called for data batches", + ); + const [resubmittedBatch] = stubs.reSubmitBatch.firstCall.args; + assert.strictEqual(resubmittedBatch.length, 1, "Data batch should have 1 message"); + assert.strictEqual( + resubmittedBatch[0].runtimeOp.type, + ContainerMessageType.FluidDataStoreOp, + "Should be a data store op", + ); + }); + + it("should derive batchId from first data batch for ID allocation", () => { + const stubs = getStateHandlerStub(); + const pendingStateManager = newPendingStateManager(stubs); + + // Enqueue an ID Allocation pending message + pendingStateManager.onFlushBatch( + [ + { + runtimeOp: { + type: ContainerMessageType.IdAllocation, + contents: { first: 0, count: 5 }, + } as unknown as LocalContainerRuntimeMessage, + referenceSequenceNumber: 10, + metadata: undefined, + localOpMetadata: "ID_ALLOC", + }, + ], + /* clientSequenceNumber: */ 1, + /* staged: */ false, + ); + // Enqueue a data batch + pendingStateManager.onFlushBatch( + [ + { + runtimeOp: { + type: ContainerMessageType.FluidDataStoreOp, + contents: testAddressedDataStoreMessage, + }, + referenceSequenceNumber: 10, + metadata: undefined, + localOpMetadata: "DATA_OP", + }, + ], + /* clientSequenceNumber: */ 2, + /* staged: */ false, + ); + + pendingStateManager.replayPendingStates(); + + // setIdAllocationBatchId should be called with derived batchId from the data batch + assert.strictEqual( + stubs.setIdAllocationBatchId.callCount, + 1, + "setIdAllocationBatchId should be called once", + ); + const derivedBatchId = stubs.setIdAllocationBatchId.firstCall.args[0]; + assert.strictEqual( + derivedBatchId, + `idAlloc_[${clientId}_[2]]`, + "Derived batchId should be based on first data batch's batchId", + ); + }); + + it("should derive batchId from ID alloc batchInfo when only ID alloc ops exist (edge case)", () => { + const stubs = getStateHandlerStub(); + const pendingStateManager = newPendingStateManager(stubs); + + // Enqueue only an ID Allocation pending message (no data batches) + pendingStateManager.onFlushBatch( + [ + { + runtimeOp: { + type: ContainerMessageType.IdAllocation, + contents: { first: 0, count: 5 }, + } as unknown as LocalContainerRuntimeMessage, + referenceSequenceNumber: 10, + metadata: undefined, + localOpMetadata: "ID_ALLOC", + }, + ], + /* clientSequenceNumber: */ 3, + /* staged: */ false, + ); + + pendingStateManager.replayPendingStates(); + + // setIdAllocationBatchId should be called with batchId derived from the ID alloc's own batchInfo + assert.strictEqual( + stubs.setIdAllocationBatchId.callCount, + 1, + "setIdAllocationBatchId should be called once", + ); + const derivedBatchId = stubs.setIdAllocationBatchId.firstCall.args[0]; + assert.strictEqual( + derivedBatchId, + `idAlloc_[${clientId}_3]`, + "Derived batchId should be based on ID alloc's own batchInfo", + ); + }); + + it("should not call setIdAllocationBatchId when there are no ID alloc ops", () => { + const stubs = getStateHandlerStub(); + const pendingStateManager = newPendingStateManager(stubs); + + // Simulate the resubmit behavior + stubs.reSubmitBatch.callsFake((batch, metadata) => { + pendingStateManager.onFlushBatch( + batch.map(({ runtimeOp, opMetadata, localOpMetadata }) => ({ + runtimeOp, + referenceSequenceNumber: 15, + metadata: opMetadata, + localOpMetadata, + })), + /* clientSequenceNumber: */ 1, + /* staged: */ metadata.staged, + ); + }); + + // Enqueue only a data batch (no ID alloc) + pendingStateManager.onFlushBatch( + [ + { + runtimeOp: { + type: ContainerMessageType.FluidDataStoreOp, + contents: testAddressedDataStoreMessage, + }, + referenceSequenceNumber: 10, + metadata: undefined, + localOpMetadata: "DATA_OP", + }, + ], + /* clientSequenceNumber: */ 1, + /* staged: */ false, + ); + + pendingStateManager.replayPendingStates(); + + assert.strictEqual( + stubs.setIdAllocationBatchId.callCount, + 0, + "setIdAllocationBatchId should not be called when no ID alloc ops exist", + ); + }); }); describe("popStagedBatches", () => { diff --git a/packages/test/test-end-to-end-tests/src/test/offline/stashedOps.spec.ts b/packages/test/test-end-to-end-tests/src/test/offline/stashedOps.spec.ts index c7407aa9a571..71f3585040d0 100644 --- a/packages/test/test-end-to-end-tests/src/test/offline/stashedOps.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/offline/stashedOps.spec.ts @@ -2262,8 +2262,7 @@ describeCompat( async (c, d) => { const counter = await d.getSharedObject(counterId); // Include an ID Allocation op to get coverage of the special logic around these ops as well - // AB#26984: Actually don't, because the ID Compressor is hitting "Ranges finalized out of order" for this test - // getIdCompressor(counter)?.generateCompressedId(); + getIdCompressor(counter)?.generateCompressedId(); counter.increment(incrementValue); }, );