Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,11 @@ export class ContainerRuntime
reSubmitBatch: this.reSubmitBatch.bind(this),
isActiveConnection: () => this.innerDeltaManager.active,
isAttached: () => this.attachState !== AttachState.Detached,
setIdAllocationBatchId: (batchId: string) => {
if (this.batchIdTrackingEnabled) {
this.outbox.setIdAllocationBatchId(batchId);
}
},
},
pendingRuntimeState?.pending,
this.baseLogger,
Expand Down
23 changes: 19 additions & 4 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,17 @@ export class Outbox {
private readonly maxMismatchedOpsToReport = 3;
private mismatchedOpsReported = 0;

/**
* Derived batchId to stamp on the next ID allocation batch flush (during resubmit).
* Set by PendingStateManager's pre-scan phase before replay, consumed (one-shot) by flushAll.
*/
private idAllocationResubmitBatchId: string | undefined;

/** Set the derived batchId for the ID allocation batch before replay flushes it. */
public setIdAllocationBatchId(batchId: string): void {
this.idAllocationResubmitBatchId = batchId;
}

constructor(private readonly params: IOutboxParameters) {
this.logger = createChildLogger({ logger: params.logger, namespace: "Outbox" });

Expand Down Expand Up @@ -397,12 +408,16 @@ export class Outbox {
return;
}

// Don't use resubmittingBatchId for idAllocationBatch.
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
// Use the derived batchId for the ID allocation batch if one was set by PSM's pre-scan.
// This enables fork detection via DuplicateBatchDetector for ID allocation batches.
const idAllocResubmitInfo: BatchResubmitInfo | undefined =
this.idAllocationResubmitBatchId === undefined
? undefined
: { batchId: this.idAllocationResubmitBatchId, staged: false };
this.idAllocationResubmitBatchId = undefined; // Consume (one-shot)
this.flushInternal({
batchManager: this.idAllocationBatch,
// Note: For now, we will never stage ID Allocation messages.
// They won't contain personal info and no harm in extra allocations in case of discarding the staged changes
resubmitInfo: idAllocResubmitInfo,
});
this.flushInternal({
batchManager: this.blobAttachBatch,
Expand Down
2 changes: 2 additions & 0 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ export interface IRuntimeStateHandler {
): void;
isActiveConnection: () => boolean;
isAttached: () => boolean;
/** Set the derived batchId for the ID allocation batch before replay flushes it. */
setIdAllocationBatchId(batchId: string): void;
}

function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,85 @@ describe("Outbox", () => {
);
});

it("ID Allocation batch gets derived batchId when setIdAllocationBatchId is called before flush", () => {
const outbox = getOutbox({
context: getMockContext(),
opGroupingConfig: {
groupedBatchingEnabled: false,
},
});
// Set a derived batchId for the ID allocation batch (simulates PSM pre-scan)
outbox.setIdAllocationBatchId("idAlloc_[clientId_[2]]");
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0"));
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1"));
outbox.flush({ batchId: "batchId-A", staged: false });

assert.deepEqual(
state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)),
[
["idAlloc_[clientId_[2]]"], // ID Allocation batch now has the derived batchId
["batchId-A"], // Main batch
],
"ID Allocation batch should have the derived batchId",
);

assert.deepEqual(
state.pendingOpContents.map(({ opMetadata }) => asBatchMetadata(opMetadata)?.batchId),
[
"idAlloc_[clientId_[2]]", // ID Allocation batch has derived batchId
"batchId-A", // Main batch
],
"Pending messages should reflect the derived batchId for ID allocation",
);
});

it("ID Allocation batch has no batchId when setIdAllocationBatchId is not called (non-resubmit)", () => {
const outbox = getOutbox({
context: getMockContext(),
opGroupingConfig: {
groupedBatchingEnabled: false,
},
});
// Do NOT call setIdAllocationBatchId — simulates normal (non-resubmit) flush
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0"));
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1"));
outbox.flush();

assert.deepEqual(
state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)),
[
[undefined], // ID Allocation batch — no batchId (not resubmit)
[undefined], // Main batch — no batchId (not resubmit)
],
"Neither batch should have a batchId on non-resubmit flush",
);
});

it("setIdAllocationBatchId is consumed (one-shot) — only first flush uses it", () => {
const outbox = getOutbox({
context: getMockContext(),
opGroupingConfig: {
groupedBatchingEnabled: false,
},
});
outbox.setIdAllocationBatchId("idAlloc_[derived]");
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0"));
outbox.flush({ batchId: "batchId-A", staged: false });

// Second flush — the derived batchId should have been consumed
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "1"));
outbox.flush();

assert.deepEqual(
state.batchesSubmitted.map((x) => x.messages.map((m) => m.metadata?.batchId)),
[
["idAlloc_[derived]"], // First flush — has derived batchId
[undefined], // Second flush — consumed, no batchId
],
"Derived batchId should only be used for the first flush",
);
});

it("Will send messages only when allowed, but will store them in the pending state", () => {
const outbox = getOutbox({ context: getMockContext() });
const messages = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: sandbox.stub(),
isActiveConnection: sandbox.stub(),
isAttached: sandbox.stub(),
setIdAllocationBatchId: sandbox.stub(),
};
stubs.applyStashedOp.resolves(undefined);
stubs.clientId.returns("clientId");
Expand Down Expand Up @@ -231,6 +232,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
undefined /* initialLocalState */,
logger,
Expand Down Expand Up @@ -720,6 +722,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
pendingStates ? { pendingStates } : undefined,
logger,
Expand Down Expand Up @@ -791,6 +794,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
{ pendingStates: messages },
logger,
Expand All @@ -809,6 +813,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
undefined /* initialLocalState */,
logger,
Expand All @@ -826,6 +831,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
localStateWithEmptyBatch,
logger,
Expand Down Expand Up @@ -867,6 +873,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
pendingStates ? { pendingStates } : undefined,
logger,
Expand Down Expand Up @@ -1013,6 +1020,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
pendingStates ? { pendingStates } : undefined /* initialLocalState */,
logger,
Expand Down Expand Up @@ -1087,6 +1095,7 @@ describe("Pending State Manager", () => {
reSubmitBatch: () => {},
isActiveConnection: () => false,
isAttached: () => true,
setIdAllocationBatchId: () => {},
},
{ pendingStates: initialMessages },
logger,
Expand Down
Loading