diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f56287ea9668..03358611ae60 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1793,6 +1793,11 @@ export class ContainerRuntime reSubmitBatch: this.reSubmitBatch.bind(this), isActiveConnection: () => this.innerDeltaManager.active, isAttached: () => this.attachState !== AttachState.Detached, + setIdAllocationBatchId: (batchId: string) => { + if (this.batchIdTrackingEnabled) { + this.outbox.setIdAllocationBatchId(batchId); + } + }, }, pendingRuntimeState?.pending, this.baseLogger, diff --git a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts index 102580b2baa1..341574cd4db2 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts @@ -208,6 +208,17 @@ export class Outbox { private readonly maxMismatchedOpsToReport = 3; private mismatchedOpsReported = 0; + /** + * Derived batchId to stamp on the next ID allocation batch flush (during resubmit). + * Set by PendingStateManager's pre-scan phase before replay, consumed (one-shot) by flushAll. + */ + private idAllocationResubmitBatchId: string | undefined; + + /** Set the derived batchId for the ID allocation batch before replay flushes it. */ + public setIdAllocationBatchId(batchId: string): void { + this.idAllocationResubmitBatchId = batchId; + } + constructor(private readonly params: IOutboxParameters) { this.logger = createChildLogger({ logger: params.logger, namespace: "Outbox" }); @@ -397,12 +408,17 @@ export class Outbox { return; } - // Don't use resubmittingBatchId for idAllocationBatch. - // ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo + // Use the derived batchId for the ID allocation batch if one was set by PSM's pre-scan. + // This enables fork detection via DuplicateBatchDetector for ID allocation batches. + const idAllocResubmitInfo: BatchResubmitInfo | undefined = + this.idAllocationResubmitBatchId === undefined + ? undefined + : // Note: For now, we will never stage ID Allocation messages. + { batchId: this.idAllocationResubmitBatchId, staged: false }; + this.idAllocationResubmitBatchId = undefined; // Consume (one-shot) this.flushInternal({ batchManager: this.idAllocationBatch, - // Note: For now, we will never stage ID Allocation messages. - // They won't contain personal info and no harm in extra allocations in case of discarding the staged changes + resubmitInfo: idAllocResubmitInfo, }); this.flushInternal({ batchManager: this.blobAttachBatch, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index f1edd45d9284..01dc86aca0fe 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -144,6 +144,8 @@ export interface IRuntimeStateHandler { ): void; isActiveConnection: () => boolean; isAttached: () => boolean; + /** Set the derived batchId for the ID allocation batch before replay flushes it. */ + setIdAllocationBatchId(batchId: string): void; } function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean { diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 9209d504209b..088f4b87f304 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -478,6 +478,85 @@ describe("Outbox", () => { ); }); + it("ID Allocation batch gets derived batchId when setIdAllocationBatchId is called before flush", () => { + const outbox = getOutbox({ + context: getMockContext(), + opGroupingConfig: { + groupedBatchingEnabled: false, + }, + }); + // Set a derived batchId for the ID allocation batch (simulates PSM pre-scan) + outbox.setIdAllocationBatchId("idAlloc_[clientId_[2]]"); + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); + outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1")); + outbox.flush({ batchId: "batchId-A", staged: false }); + + assert.deepEqual( + state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)), + [ + ["idAlloc_[clientId_[2]]"], // ID Allocation batch now has the derived batchId + ["batchId-A"], // Main batch + ], + "ID Allocation batch should have the derived batchId", + ); + + assert.deepEqual( + state.pendingOpContents.map(({ opMetadata }) => asBatchMetadata(opMetadata)?.batchId), + [ + "idAlloc_[clientId_[2]]", // ID Allocation batch has derived batchId + "batchId-A", // Main batch + ], + "Pending messages should reflect the derived batchId for ID allocation", + ); + }); + + it("ID Allocation batch has no batchId when setIdAllocationBatchId is not called (non-resubmit)", () => { + const outbox = getOutbox({ + context: getMockContext(), + opGroupingConfig: { + groupedBatchingEnabled: false, + }, + }); + // Do NOT call setIdAllocationBatchId — simulates normal (non-resubmit) flush + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); + outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1")); + outbox.flush(); + + assert.deepEqual( + state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)), + [ + [undefined], // ID Allocation batch — no batchId (not resubmit) + [undefined], // Main batch — no batchId (not resubmit) + ], + "Neither batch should have a batchId on non-resubmit flush", + ); + }); + + it("setIdAllocationBatchId is consumed (one-shot) — only first flush uses it", () => { + const outbox = getOutbox({ + context: getMockContext(), + opGroupingConfig: { + groupedBatchingEnabled: false, + }, + }); + outbox.setIdAllocationBatchId("idAlloc_[derived]"); + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); + outbox.flush({ batchId: "batchId-A", staged: false }); + + // Second flush — the derived batchId should have been consumed + outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "1")); + outbox.flush(); + + assert.deepEqual( + state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)), + [ + ["idAlloc_[derived]"], // First flush — has derived batchId + [undefined], // Second flush — consumed, no batchId + ], + "Derived batchId should only be used for the first flush", + ); + }); + it("Will send messages only when allowed, but will store them in the pending state", () => { const outbox = getOutbox({ context: getMockContext() }); const messages = [ diff --git a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts index 2c3b3e4df6df..c7d0e7eefb02 100644 --- a/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts +++ b/packages/runtime/container-runtime/src/test/pendingStateManager.spec.ts @@ -96,6 +96,7 @@ describe("Pending State Manager", () => { reSubmitBatch: sandbox.stub(), isActiveConnection: sandbox.stub(), isAttached: sandbox.stub(), + setIdAllocationBatchId: sandbox.stub(), }; stubs.applyStashedOp.resolves(undefined); stubs.clientId.returns("clientId"); @@ -231,6 +232,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, undefined /* initialLocalState */, logger, @@ -720,6 +722,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, pendingStates ? { pendingStates } : undefined, logger, @@ -791,6 +794,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, { pendingStates: messages }, logger, @@ -809,6 +813,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, undefined /* initialLocalState */, logger, @@ -826,6 +831,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, localStateWithEmptyBatch, logger, @@ -867,6 +873,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, pendingStates ? { pendingStates } : undefined, logger, @@ -1013,6 +1020,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, pendingStates ? { pendingStates } : undefined /* initialLocalState */, logger, @@ -1087,6 +1095,7 @@ describe("Pending State Manager", () => { reSubmitBatch: () => {}, isActiveConnection: () => false, isAttached: () => true, + setIdAllocationBatchId: () => {}, }, { pendingStates: initialMessages }, logger,