Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,11 @@ export class ContainerRuntime
reSubmitBatch: this.reSubmitBatch.bind(this),
isActiveConnection: () => this.innerDeltaManager.active,
isAttached: () => this.attachState !== AttachState.Detached,
setIdAllocationBatchId: (batchId: string) => {
if (this.batchIdTrackingEnabled) {
this.outbox.setIdAllocationBatchId(batchId);
}
},
},
pendingRuntimeState?.pending,
this.baseLogger,
Expand Down
24 changes: 20 additions & 4 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ export class Outbox {
private readonly maxMismatchedOpsToReport = 3;
private mismatchedOpsReported = 0;

/**
* Derived batchId to stamp on the next ID allocation batch flush (during resubmit).
* Set by PendingStateManager's pre-scan phase before replay, consumed (one-shot) by flushAll.
*/
private idAllocationResubmitBatchId: string | undefined;

/** Set the derived batchId for the ID allocation batch before replay flushes it. */
public setIdAllocationBatchId(batchId: string): void {
this.idAllocationResubmitBatchId = batchId;
}

constructor(private readonly params: IOutboxParameters) {
this.logger = createChildLogger({ logger: params.logger, namespace: "Outbox" });

Expand Down Expand Up @@ -397,12 +408,17 @@ export class Outbox {
return;
}

// Don't use resubmittingBatchId for idAllocationBatch.
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
// Use the derived batchId for the ID allocation batch if one was set by PSM's pre-scan.
// This enables fork detection via DuplicateBatchDetector for ID allocation batches.
const idAllocResubmitInfo: BatchResubmitInfo | undefined =
this.idAllocationResubmitBatchId === undefined
? undefined
: // Note: For now, we will never stage ID Allocation messages.
{ batchId: this.idAllocationResubmitBatchId, staged: false };
this.idAllocationResubmitBatchId = undefined; // Consume (one-shot)
this.flushInternal({
batchManager: this.idAllocationBatch,
// Note: For now, we will never stage ID Allocation messages.
// They won't contain personal info and no harm in extra allocations in case of discarding the staged changes
resubmitInfo: idAllocResubmitInfo,
});
this.flushInternal({
batchManager: this.blobAttachBatch,
Expand Down
2 changes: 2 additions & 0 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ export interface IRuntimeStateHandler {
): void;
isActiveConnection: () => boolean;
isAttached: () => boolean;
/** Set the derived batchId for the ID allocation batch before replay flushes it. */
setIdAllocationBatchId(batchId: string): void;
}

function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,85 @@ describe("Outbox", () => {
);
});

it("ID Allocation batch gets derived batchId when setIdAllocationBatchId is called before flush", () => {
const outbox = getOutbox({
context: getMockContext(),
opGroupingConfig: {
groupedBatchingEnabled: false,
},
});
// Set a derived batchId for the ID allocation batch (simulates PSM pre-scan)
outbox.setIdAllocationBatchId("idAlloc_[clientId_[2]]");
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0"));
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1"));
outbox.flush({ batchId: "batchId-A", staged: false });

assert.deepEqual(
state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)),
[
["idAlloc_[clientId_[2]]"], // ID Allocation batch now has the derived batchId
["batchId-A"], // Main batch
],
"ID Allocation batch should have the derived batchId",
);

assert.deepEqual(
state.pendingOpContents.map(({ opMetadata }) => asBatchMetadata(opMetadata)?.batchId),
[
"idAlloc_[clientId_[2]]", // ID Allocation batch has derived batchId
"batchId-A", // Main batch
],
"Pending messages should reflect the derived batchId for ID allocation",
);
});

it("ID Allocation batch has no batchId when setIdAllocationBatchId is not called (non-resubmit)", () => {
const outbox = getOutbox({
context: getMockContext(),
opGroupingConfig: {
groupedBatchingEnabled: false,
},
});
// Do NOT call setIdAllocationBatchId — simulates normal (non-resubmit) flush
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0"));
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1"));
outbox.flush();

assert.deepEqual(
state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)),
[
[undefined], // ID Allocation batch — no batchId (not resubmit)
[undefined], // Main batch — no batchId (not resubmit)
],
"Neither batch should have a batchId on non-resubmit flush",
);
});

it("setIdAllocationBatchId is consumed (one-shot) — only first flush uses it", () => {
const outbox = getOutbox({
context: getMockContext(),
opGroupingConfig: {
groupedBatchingEnabled: false,
},
});
outbox.setIdAllocationBatchId("idAlloc_[derived]");
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0"));
outbox.flush({ batchId: "batchId-A", staged: false });

// Second flush — the derived batchId should have been consumed
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "1"));
outbox.flush();

assert.deepEqual(
state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)),
[
["idAlloc_[derived]"], // First flush — has derived batchId
[undefined], // Second flush — consumed, no batchId
],
"Derived batchId should only be used for the first flush",
);
});

it("Will send messages only when allowed, but will store them in the pending state", () => {
const outbox = getOutbox({ context: getMockContext() });
const messages = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: sandbox.stub(),
isActiveConnection: sandbox.stub(),
isAttached: sandbox.stub(),
setIdAllocationBatchId: sandbox.stub(),
};
stubs.applyStashedOp.resolves(undefined);
stubs.clientId.returns("clientId");
Expand Down Expand Up @@ -231,6 +232,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
undefined /* initialLocalState */,
logger,
Expand Down Expand Up @@ -720,6 +722,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
pendingStates ? { pendingStates } : undefined,
logger,
Expand Down Expand Up @@ -791,6 +794,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
{ pendingStates: messages },
logger,
Expand All @@ -809,6 +813,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
undefined /* initialLocalState */,
logger,
Expand All @@ -826,6 +831,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
localStateWithEmptyBatch,
logger,
Expand Down Expand Up @@ -867,6 +873,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
pendingStates ? { pendingStates } : undefined,
logger,
Expand Down Expand Up @@ -1013,6 +1020,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
pendingStates ? { pendingStates } : undefined /* initialLocalState */,
logger,
Expand Down Expand Up @@ -1087,6 +1095,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
{ pendingStates: initialMessages },
logger,
Expand Down
Loading