Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,9 @@ export class ContainerRuntime
reSubmitBatch: this.reSubmitBatch.bind(this),
isActiveConnection: () => this.innerDeltaManager.active,
isAttached: () => this.attachState !== AttachState.Detached,
setIdAllocationBatchId: (batchId: string) => {
this.outbox.setIdAllocationBatchId(batchId);
},
},
pendingRuntimeState?.pending,
this.baseLogger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ export interface IBatchManagerOptions {
* If true, the outbox is allowed to rebase the batch during flushing.
*/
readonly canRebase: boolean;

/**
* If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored
*/
readonly ignoreBatchId?: boolean;
}

export interface BatchSequenceNumbers {
Expand Down
25 changes: 19 additions & 6 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,20 @@ export class Outbox {
this.blobAttachBatch = new BatchManager({ canRebase: true });
this.idAllocationBatch = new BatchManager({
canRebase: false,
ignoreBatchId: true,
});
}

/**
* Derived batchId to stamp on the next ID allocation batch flush (during resubmit).
* Set by PendingStateManager's pre-scan phase before replay, consumed (one-shot) by flushAll.
*/
private idAllocationResubmitBatchId: string | undefined;

/** Set the derived batchId for the ID allocation batch before replay flushes it. */
public setIdAllocationBatchId(batchId: string): void {
this.idAllocationResubmitBatchId = batchId;
}

public get messageCount(): number {
return this.mainBatch.length + this.blobAttachBatch.length + this.idAllocationBatch.length;
}
Expand Down Expand Up @@ -397,12 +407,16 @@ export class Outbox {
return;
}

// Don't use resubmittingBatchId for idAllocationBatch.
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
// Use the derived batchId for the ID allocation batch if one was set by PSM's pre-scan.
// This enables fork detection via DuplicateBatchDetector for ID allocation batches.
const idAllocResubmitInfo: BatchResubmitInfo | undefined =
this.idAllocationResubmitBatchId === undefined
? undefined
: { batchId: this.idAllocationResubmitBatchId, staged: false };
this.idAllocationResubmitBatchId = undefined; // Consume (one-shot)
this.flushInternal({
batchManager: this.idAllocationBatch,
// Note: For now, we will never stage ID Allocation messages.
// They won't contain personal info and no harm in extra allocations in case of discarding the staged changes
resubmitInfo: idAllocResubmitInfo,
});
this.flushInternal({
batchManager: this.blobAttachBatch,
Expand Down Expand Up @@ -499,7 +513,6 @@ export class Outbox {
rawBatch.messages,
clientSequenceNumber,
staged,
batchManager.options.ignoreBatchId,
);
}

Expand Down
179 changes: 146 additions & 33 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Deque from "double-ended-queue";
import { v4 as uuid } from "uuid";

import { isContainerMessageDirtyable } from "./containerRuntime.js";
import { ContainerMessageType } from "./messageTypes.js";
import type {
InboundContainerRuntimeMessage,
InboundSequencedContainerRuntimeMessage,
Expand Down Expand Up @@ -84,10 +85,6 @@ export interface IPendingMessage {
* length of the batch (how many runtime messages here)
*/
length: number;
/**
* If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored
*/
ignoreBatchId?: boolean;
/**
* If true, this batch is staged and should not actually be submitted on replayPendingStates.
*/
Expand Down Expand Up @@ -144,6 +141,8 @@ export interface IRuntimeStateHandler {
): void;
isActiveConnection: () => boolean;
isAttached: () => boolean;
/** Set the derived batchId for the ID allocation batch before replay flushes it. */
setIdAllocationBatchId(batchId: string): void;
}

function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean {
Expand Down Expand Up @@ -403,13 +402,11 @@ export class PendingStateManager implements IDisposable {
* @param clientSequenceNumber - The CSN of the first message in the batch,
* or undefined if the batch was not yet sent (e.g. by the time we flushed we lost the connection)
* @param staged - Indicates whether batch is staged (not to be submitted while runtime is in Staging Mode)
* @param ignoreBatchId - Whether to ignore the batchId in the batchStartInfo
*/
public onFlushBatch(
batch: LocalBatchMessage[] | [LocalEmptyBatchPlaceholder],
clientSequenceNumber: number | undefined,
staged: boolean,
ignoreBatchId?: boolean,
): void {
// clientId and batchStartCsn are used for generating the batchId so we can detect container forks
// where this batch was submitted by two different clients rehydrating from the same local state.
Expand Down Expand Up @@ -443,7 +440,7 @@ export class PendingStateManager implements IDisposable {
localOpMetadata,
opMetadata,
// Note: We only will read this off the first message, but put it on all for simplicity
batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged },
batchInfo: { clientId, batchStartCsn, length: batch.length, staged },
};
this.pendingMessages.push(pendingMessage);
}
Expand Down Expand Up @@ -502,33 +499,128 @@ export class PendingStateManager implements IDisposable {
}
}

/**
* Scan pending messages to derive a deterministic batchId for any ID Allocation batch,
* and set it on the Outbox so it will be used when the ID allocation batch is flushed
* during replay. This enables fork detection via DuplicateBatchDetector for ID allocation
* batches that get resubmitted after rehydration.
*
* @param messageCount - Number of pending messages to scan. Defaults to all.
* @param skipNonStaged - If true, skip non-staged messages (for committingStagedBatches mode).
*/
private deriveIdAllocationBatchId(messageCount?: number, skipNonStaged?: boolean): void {
const count = messageCount ?? this.pendingMessages.length;
let scanIndex = 0;
const dataBatchIds: string[] = [];
let hasIdAllocBatch = false;
let idAllocBatchInfo: { clientId: string; batchStartCsn: number } | undefined;

while (scanIndex < count) {
const msg = this.pendingMessages.get(scanIndex);
assert(msg !== undefined, 0xc03 /* expected pending message at scan index */);

Comment on lines +519 to +521
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New asserts added here use numeric/hex error codes (e.g. 0xc03). Per repo guideline, newly added assert calls should use a string literal message instead of an assert code. Please switch this assert to a descriptive string message.

Copilot generated this review using guidance from repository custom instructions.
// In committingStagedBatches mode, skip non-staged messages (they won't be replayed)
if (skipNonStaged === true && !msg.batchInfo.staged) {
scanIndex++;
continue;
}

const isIdAlloc =
hasTypicalRuntimeOp(msg) && msg.runtimeOp.type === ContainerMessageType.IdAllocation;

if (isIdAlloc) {
hasIdAllocBatch = true;
idAllocBatchInfo = {
clientId: msg.batchInfo.clientId,
batchStartCsn: msg.batchInfo.batchStartCsn,
};
} else {
// Data batch (including empty batches) — collect its batchId
dataBatchIds.push(getEffectiveBatchId(msg));
}

// Advance past the current batch (could be multi-message)
const batchFlag = asBatchMetadata(msg.opMetadata)?.batch;
if (batchFlag === true) {
// Multi-message batch — find the end
scanIndex++;
while (scanIndex < count) {
const innerMsg = this.pendingMessages.get(scanIndex);
scanIndex++;
if (innerMsg?.opMetadata?.batch === false) {
break;
}
}
} else {
scanIndex++;
}
}

if (!hasIdAllocBatch) {
return;
}

let derivedIdAllocBatchId: string;
if (dataBatchIds.length > 0) {
// Derive from the first data batch's ID (deterministic — both forks see same stashed state)
derivedIdAllocBatchId = `idAlloc_[${dataBatchIds[0]}]`;
} else {
// Edge case: Only ID alloc ops exist (no data batches).
// Derive from the ID alloc pending message's own batchInfo.
assert(
idAllocBatchInfo !== undefined,
0xc04 /* idAllocBatchInfo must be set when hasIdAllocBatch is true */,
);
Comment on lines +557 to +560
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New assert added for the ID-allocation-only edge case uses a numeric/hex error code (0xc04). Repo guideline for newly added assert calls is to use a string literal message instead of an assert code; please update this assert accordingly.

Copilot generated this review using guidance from repository custom instructions.
derivedIdAllocBatchId = `idAlloc_[${idAllocBatchInfo.clientId}_${idAllocBatchInfo.batchStartCsn}]`;
}

// Set the derived batchId on the Outbox for when the ID allocation batch is flushed during replay.
this.stateHandler.setIdAllocationBatchId(derivedIdAllocBatchId);
}

/**
* Compares the batch ID of the incoming batch with the pending batch ID for this client.
* They should not match, as that would indicate a forked container.
* @param remoteBatchStart - BatchStartInfo for an incoming batch *NOT* submitted by this client
* @returns whether the batch IDs match
*/
private remoteBatchMatchesPendingBatch(remoteBatchStart: BatchStartInfo): boolean {
// Find the first pending message that uses Batch ID, to compare to the incoming remote batch.
// If there is no such message, then the incoming remote batch doesn't have a match here and we can return.
const firstIndexUsingBatchId = Array.from({
length: this.pendingMessages.length,
}).findIndex((_, i) => this.pendingMessages.get(i)?.batchInfo.ignoreBatchId !== true);
const pendingMessageUsingBatchId =
firstIndexUsingBatchId === -1
? undefined
: this.pendingMessages.get(firstIndexUsingBatchId);

if (pendingMessageUsingBatchId === undefined) {
return false;
}

// We must compare the effective batch IDs, since one of these ops
// may have been the original, not resubmitted, so wouldn't have its batch ID stamped yet.
const pendingBatchId = getEffectiveBatchId(pendingMessageUsingBatchId);
const inboundBatchId = getEffectiveBatchId(remoteBatchStart);

return pendingBatchId === inboundBatchId;
// Scan all pending batches (at batch boundaries) and check if any match the
// incoming remote batch. We scan beyond the first because the pending queue may
// start with an ID allocation batch (from stashed state) whose effective batchId
// differs from the inbound batch, while a later data batch does match.
let scanIndex = 0;
while (scanIndex < this.pendingMessages.length) {
const msg = this.pendingMessages.get(scanIndex);
if (msg === undefined) {
break;
}

const pendingBatchId = getEffectiveBatchId(msg);
if (pendingBatchId === inboundBatchId) {
return true;
}

// Advance past the current batch (could be multi-message)
const batchFlag = asBatchMetadata(msg.opMetadata)?.batch;
if (batchFlag === true) {
// Multi-message batch — skip to end
scanIndex++;
while (scanIndex < this.pendingMessages.length) {
const innerMsg = this.pendingMessages.get(scanIndex);
scanIndex++;
if (innerMsg?.opMetadata?.batch === false) {
break;
}
}
} else {
scanIndex++;
}
}

return false;
}

/**
Expand Down Expand Up @@ -776,11 +868,16 @@ export class PendingStateManager implements IDisposable {
const initialPendingMessagesCount = this.pendingMessages.length;
let remainingPendingMessagesCount = this.pendingMessages.length;

// Derive the batchId for any ID Allocation batch and set it on the Outbox,
// so the ID allocation batch will be flushed with this batchId during replay.
this.deriveIdAllocationBatchId(initialPendingMessagesCount, committingStagedBatches);

let seenStagedBatch = false;

// Process exactly `pendingMessagesCount` items in the queue as it represents the number of messages that were
// pending when we connected. This is important because the `reSubmitFn` might add more items in the queue
// which must not be replayed.
// === Phase 3: Dequeue and replay, skipping ID Allocation batches ===
// Process exactly `initialPendingMessagesCount` items in the queue as it represents the
// number of messages that were pending when we connected. This is important because the
// `reSubmitFn` might add more items in the queue which must not be replayed.
while (remainingPendingMessagesCount > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let pendingMessage = this.pendingMessages.shift()!;
Expand All @@ -801,12 +898,28 @@ export class PendingStateManager implements IDisposable {
const batchMetadataFlag = asBatchMetadata(pendingMessage.opMetadata)?.batch;
assert(batchMetadataFlag !== false, 0x41b /* We cannot process batches in chunks */);

// The next message starts a batch (possibly single-message), and we'll need its batchId.
const batchId =
pendingMessage.batchInfo.ignoreBatchId === true
? undefined
: getEffectiveBatchId(pendingMessage);
// Skip ID Allocation batches during replay. Fresh ID alloc ops were already submitted
// by submitIdAllocationOpIfNeeded before replay started, and the resubmit handler for
// IdAllocation is a no-op anyway (see ContainerRuntime.reSubmitCore).
if (
hasTypicalRuntimeOp(pendingMessage) &&
pendingMessage.runtimeOp.type === ContainerMessageType.IdAllocation
) {
// If it's a multi-message ID alloc batch, consume the rest of it
if (batchMetadataFlag === true) {
while (remainingPendingMessagesCount > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const innerMsg = this.pendingMessages.shift()!;
remainingPendingMessagesCount--;
if (innerMsg.opMetadata?.batch === false) {
break;
}
}
}
continue;
}

const batchId = getEffectiveBatchId(pendingMessage);
const staged = pendingMessage.batchInfo.staged;

if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,36 @@ describe("DuplicateBatchDetector", () => {
);
});

it("ID Allocation batches with derived batchIds are detected as duplicates", () => {
// Simulates two forked containers submitting ID allocation batches with the same derived batchId
const derivedIdAllocBatchId = "idAlloc_[clientId_[2]]";

const inboundBatch1 = makeBatch({
sequenceNumber: seqNum++,
minimumSequenceNumber: 0,
batchId: derivedIdAllocBatchId,
});
const inboundBatch2 = makeBatch({
sequenceNumber: seqNum++,
minimumSequenceNumber: 0,
batchId: derivedIdAllocBatchId,
});

const result1 = detector.processInboundBatch(inboundBatch1);
assert.deepEqual(
result1,
{ duplicate: false },
"First ID alloc batch should not be a duplicate",
);

const result2 = detector.processInboundBatch(inboundBatch2);
assert.deepEqual(
result2,
{ duplicate: true, otherSequenceNumber: 1 },
"Second ID alloc batch with same derived batchId should be detected as a duplicate",
);
});

describe("getStateForSummary", () => {
it("If empty, return undefined", () => {
assert.equal(
Expand Down
Loading
Loading