Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,11 @@ export class ContainerRuntime
reSubmitBatch: this.reSubmitBatch.bind(this),
isActiveConnection: () => this.innerDeltaManager.active,
isAttached: () => this.attachState !== AttachState.Detached,
setIdAllocationBatchId: (batchId: string) => {
if (this.batchIdTrackingEnabled) {
this.outbox.setIdAllocationBatchId(batchId);
}
},
},
pendingRuntimeState?.pending,
this.baseLogger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ export interface IBatchManagerOptions {
* If true, the outbox is allowed to rebase the batch during flushing.
*/
readonly canRebase: boolean;

/**
* If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored
*/
readonly ignoreBatchId?: boolean;
}

export interface BatchSequenceNumbers {
Expand Down
25 changes: 19 additions & 6 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,20 @@ export class Outbox {
this.blobAttachBatch = new BatchManager({ canRebase: true });
this.idAllocationBatch = new BatchManager({
canRebase: false,
ignoreBatchId: true,
});
}

/**
* Derived batchId to stamp on the next ID allocation batch flush (during resubmit).
* Set by PendingStateManager's pre-scan phase before replay, consumed (one-shot) by flushAll.
*/
private idAllocationResubmitBatchId: string | undefined;

/** Set the derived batchId for the ID allocation batch before replay flushes it. */
public setIdAllocationBatchId(batchId: string): void {
this.idAllocationResubmitBatchId = batchId;
}

public get messageCount(): number {
return this.mainBatch.length + this.blobAttachBatch.length + this.idAllocationBatch.length;
}
Expand Down Expand Up @@ -397,12 +407,16 @@ export class Outbox {
return;
}

// Don't use resubmittingBatchId for idAllocationBatch.
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
// Use the derived batchId for the ID allocation batch if one was set by PSM's pre-scan.
// This enables fork detection via DuplicateBatchDetector for ID allocation batches.
const idAllocResubmitInfo: BatchResubmitInfo | undefined =
this.idAllocationResubmitBatchId === undefined
? undefined
: { batchId: this.idAllocationResubmitBatchId, staged: false };
this.idAllocationResubmitBatchId = undefined; // Consume (one-shot)
this.flushInternal({
batchManager: this.idAllocationBatch,
// Note: For now, we will never stage ID Allocation messages.
// They won't contain personal info and no harm in extra allocations in case of discarding the staged changes
resubmitInfo: idAllocResubmitInfo,
});
this.flushInternal({
batchManager: this.blobAttachBatch,
Expand Down Expand Up @@ -499,7 +513,6 @@ export class Outbox {
rawBatch.messages,
clientSequenceNumber,
staged,
batchManager.options.ignoreBatchId,
);
}

Expand Down
207 changes: 174 additions & 33 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Deque from "double-ended-queue";
import { v4 as uuid } from "uuid";

import { isContainerMessageDirtyable } from "./containerRuntime.js";
import { ContainerMessageType } from "./messageTypes.js";
import type {
InboundContainerRuntimeMessage,
InboundSequencedContainerRuntimeMessage,
Expand Down Expand Up @@ -84,10 +85,6 @@ export interface IPendingMessage {
* length of the batch (how many runtime messages here)
*/
length: number;
/**
* If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored
*/
ignoreBatchId?: boolean;
/**
* If true, this batch is staged and should not actually be submitted on replayPendingStates.
*/
Expand Down Expand Up @@ -144,6 +141,8 @@ export interface IRuntimeStateHandler {
): void;
isActiveConnection: () => boolean;
isAttached: () => boolean;
/** Set the derived batchId for the ID allocation batch before replay flushes it. */
setIdAllocationBatchId(batchId: string): void;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a way we can pass the batchId inherently through the existing call patterns, that will be much more robust. With this separate function, it's possible the batchId set here and the stuff being replayed could get out of sync

}

function isEmptyBatchPendingMessage(message: IPendingMessageFromStash): boolean {
Expand Down Expand Up @@ -403,13 +402,11 @@ export class PendingStateManager implements IDisposable {
* @param clientSequenceNumber - The CSN of the first message in the batch,
* or undefined if the batch was not yet sent (e.g. by the time we flushed we lost the connection)
* @param staged - Indicates whether batch is staged (not to be submitted while runtime is in Staging Mode)
* @param ignoreBatchId - Whether to ignore the batchId in the batchStartInfo
*/
public onFlushBatch(
batch: LocalBatchMessage[] | [LocalEmptyBatchPlaceholder],
clientSequenceNumber: number | undefined,
staged: boolean,
ignoreBatchId?: boolean,
): void {
// clientId and batchStartCsn are used for generating the batchId so we can detect container forks
// where this batch was submitted by two different clients rehydrating from the same local state.
Expand Down Expand Up @@ -443,7 +440,7 @@ export class PendingStateManager implements IDisposable {
localOpMetadata,
opMetadata,
// Note: We only will read this off the first message, but put it on all for simplicity
batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged },
batchInfo: { clientId, batchStartCsn, length: batch.length, staged },
};
this.pendingMessages.push(pendingMessage);
}
Expand Down Expand Up @@ -502,33 +499,125 @@ export class PendingStateManager implements IDisposable {
}
}

/**
* Scan pending messages to derive a deterministic batchId for any ID Allocation batch,
* and set it on the Outbox so it will be used when the ID allocation batch is flushed
* during replay. This enables fork detection via DuplicateBatchDetector for ID allocation
* batches that get resubmitted after rehydration.
*
* @param messageCount - Number of pending messages to scan. Defaults to all.
* @param skipNonStaged - If true, skip non-staged messages (for committingStagedBatches mode).
*/
private deriveIdAllocationBatchId(messageCount?: number, skipNonStaged?: boolean): void {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates a lot of logic with replayPendingStates. I talked with Claude about it and like the solution we came up with, would be a nice pre-requisite refactor (I want to move PSM to be more natively batch-centric anyway):

Plan: Consolidate batch traversal into a single structured pass

Context

deriveIdAllocationBatchId and replayPendingStates both traverse the pending message queue, detect batch boundaries, classify batches (ID alloc vs data vs empty), skip non-staged messages, and extract batch IDs — using different mechanisms. This duplication means a change to batch structure conventions must be mirrored in multiple places, risking drift.

Approach

Introduce a pre-scan phase that produces a BatchDescriptor[], then have both derivation and replay consume that list.

New type: BatchDescriptor

interface BatchDescriptor {
  /** Index of the first message in this batch within pendingMessages */
  startIndex: number;
  /** Number of messages in this batch */
  length: number;
  /** Classified batch kind */
  kind: "idAllocation" | "data" | "empty";
  /** Effective batchId (from metadata or derived from clientId+csn) */
  batchId: string;
  /** Whether this batch is staged */
  staged: boolean;
}

New function: scanPendingBatches

A private method (or free function) that does a single forward pass over pendingMessages[0..count):

  1. Start at index 0
  2. Peek the first message to determine batch boundaries (using asBatchMetadata(...).batch flag — same logic currently in nextBatchIndex)
  3. Classify the batch:
    • hasTypicalRuntimeOp(msg) && msg.runtimeOp.type === ContainerMessageType.IdAllocation"idAllocation"
    • asEmptyBatchLocalOpMetadata(msg.localOpMetadata)?.emptyBatch"empty"
    • otherwise → "data"
  4. Compute getEffectiveBatchId(msg)
  5. Read msg.batchInfo.staged
  6. Push a BatchDescriptor, advance index past the batch
  7. Repeat until index >= count

Changes to deriveIdAllocationBatchId

Replace the manual scan with:

const batches = this.scanPendingBatches(messageCount, skipNonStaged);
const idAllocBatch = batches.find(b => b.kind === "idAllocation");
if (!idAllocBatch) return;

const firstDataBatch = batches.find(b => b.kind !== "idAllocation");
const derivedId = firstDataBatch
  ? `idAlloc_[${firstDataBatch.batchId}]`
  : `idAlloc_[${...fallback from idAllocBatch's batchInfo...}]`;

this.stateHandler.setIdAllocationBatchId(derivedId);

Changes to replayPendingStates

Replace the inline traversal with iteration over BatchDescriptor[]:

const batches = this.scanPendingBatches(initialPendingMessagesCount);
// Phase 1: derive ID alloc batch ID (already done via batches list)
// Phase 2: iterate batches
for (const batch of batches) {
  // shift batch.length messages from the queue
  // skip if committingStagedBatches && !batch.staged (re-queue them)
  // skip if batch.kind === "idAllocation"
  // resubmit empty / single / multi-message batches using batch.batchId
}

The inner shift + collect loop stays (we still need the actual messages for resubmission), but all classification and boundary detection is driven by the descriptor, not re-derived inline.

nextBatchIndex and remoteBatchMatchesPendingBatch

  • nextBatchIndex stays as-is — it's used by scanPendingBatches internally and by remoteBatchMatchesPendingBatch (which isn't part of replay).
  • remoteBatchMatchesPendingBatch could optionally consume BatchDescriptor[] too, but it only checks 1-2 batches so the benefit is marginal. Leave it for now.

Files to modify

  • pendingStateManager.ts — add BatchDescriptor type, scanPendingBatches method; refactor deriveIdAllocationBatchId and replayPendingStates

Verification

  • Existing unit tests in pendingStateManager.spec.ts cover replay, fork detection, staged batches, and ID allocation derivation. All should pass unchanged.
  • Run: npm run test in packages/runtime/container-runtime

const count = messageCount ?? this.pendingMessages.length;
let scanIndex = 0;
const dataBatchIds: string[] = [];
let hasIdAllocBatch = false;
let idAllocBatchInfo: { clientId: string; batchStartCsn: number } | undefined;

while (scanIndex < count) {
const msg = this.pendingMessages.get(scanIndex);
assert(msg !== undefined, 0xc03 /* expected pending message at scan index */);

Comment on lines +519 to +521
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New asserts added here use numeric/hex error codes (e.g. 0xc03). Per repo guideline, newly added assert calls should use a string literal message instead of an assert code. Please switch this assert to a descriptive string message.

Copilot generated this review using guidance from repository custom instructions.
// In committingStagedBatches mode, skip non-staged messages (they won't be replayed)
if (skipNonStaged === true && !msg.batchInfo.staged) {
scanIndex++;
continue;
}

const isIdAlloc =
hasTypicalRuntimeOp(msg) && msg.runtimeOp.type === ContainerMessageType.IdAllocation;

if (isIdAlloc) {
hasIdAllocBatch = true;
idAllocBatchInfo = {
clientId: msg.batchInfo.clientId,
batchStartCsn: msg.batchInfo.batchStartCsn,
};
} else {
// Data batch (including empty batches) — collect its batchId
dataBatchIds.push(getEffectiveBatchId(msg));
}

// Advance past the current batch (could be multi-message)
scanIndex = nextBatchIndex(this.pendingMessages, scanIndex, count);
}

if (!hasIdAllocBatch) {
return;
}

let derivedIdAllocBatchId: string;
if (dataBatchIds.length > 0) {
// Derive from the first data batch's ID (deterministic — both forks see same stashed state)
derivedIdAllocBatchId = `idAlloc_[${dataBatchIds[0]}]`;
} else {
// Edge case: Only ID alloc ops exist (no data batches).
// Derive from the ID alloc pending message's own batchInfo.
assert(
idAllocBatchInfo !== undefined,
0xc04 /* idAllocBatchInfo must be set when hasIdAllocBatch is true */,
);
Comment on lines +557 to +560
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New assert added for the ID-allocation-only edge case uses a numeric/hex error code (0xc04). Repo guideline for newly added assert calls is to use a string literal message instead of an assert code; please update this assert accordingly.

Copilot generated this review using guidance from repository custom instructions.
derivedIdAllocBatchId = `idAlloc_[${idAllocBatchInfo.clientId}_${idAllocBatchInfo.batchStartCsn}]`;
}

// Set the derived batchId on the Outbox for when the ID allocation batch is flushed during replay.
this.stateHandler.setIdAllocationBatchId(derivedIdAllocBatchId);
}

/**
* Compares the batch ID of the incoming batch with the pending batch ID for this client.
* They should not match, as that would indicate a forked container.
* @param remoteBatchStart - BatchStartInfo for an incoming batch *NOT* submitted by this client
* @returns whether the batch IDs match
*/
private remoteBatchMatchesPendingBatch(remoteBatchStart: BatchStartInfo): boolean {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that ID Allocation Batches have batchID can't this be as simple as comparing the incoming Batch ID to the pending Batch ID, normalizing off the ID Allocation prefix?

// Find the first pending message that uses Batch ID, to compare to the incoming remote batch.
// If there is no such message, then the incoming remote batch doesn't have a match here and we can return.
const firstIndexUsingBatchId = Array.from({
length: this.pendingMessages.length,
}).findIndex((_, i) => this.pendingMessages.get(i)?.batchInfo.ignoreBatchId !== true);
const pendingMessageUsingBatchId =
firstIndexUsingBatchId === -1
? undefined
: this.pendingMessages.get(firstIndexUsingBatchId);

if (pendingMessageUsingBatchId === undefined) {
return false;
}

// We must compare the effective batch IDs, since one of these ops
// may have been the original, not resubmitted, so wouldn't have its batch ID stamped yet.
const pendingBatchId = getEffectiveBatchId(pendingMessageUsingBatchId);
const inboundBatchId = getEffectiveBatchId(remoteBatchStart);

return pendingBatchId === inboundBatchId;
// Check the first pending batch, and if it's an ID allocation batch that doesn't
// match, also check the next batch. We look beyond the first because the pending
// queue may start with a stashed ID allocation batch whose effective batchId differs
// from the inbound batch, while the following data batch does match.
// ID alloc is always flushed first by the Outbox, so it can only be at position 0.
let scanIndex = 0;
// maxBatches: 1 normally, 2 if the first batch is an unmatched ID alloc
const maxBatches = 2;
for (
let batchesSeen = 0;
batchesSeen < maxBatches && scanIndex < this.pendingMessages.length;
batchesSeen++
) {
const msg = this.pendingMessages.get(scanIndex);
if (msg === undefined) {
break;
}

const pendingBatchId = getEffectiveBatchId(msg);
if (pendingBatchId === inboundBatchId) {
return true;
}

// Only continue scanning past the first batch if it was an ID allocation batch.
// Any other batch type at position 0 means there's no ID alloc to skip over.
if (
batchesSeen === 0 &&
hasTypicalRuntimeOp(msg) &&
msg.runtimeOp.type === ContainerMessageType.IdAllocation
) {
// Advance past this ID alloc batch (could be multi-message) and check one more
scanIndex = nextBatchIndex(
this.pendingMessages,
scanIndex,
this.pendingMessages.length,
);
continue;
}

// First batch wasn't an ID alloc — no need to check further
break;
}

return false;
}

/**
Expand Down Expand Up @@ -776,11 +865,20 @@ export class PendingStateManager implements IDisposable {
const initialPendingMessagesCount = this.pendingMessages.length;
let remainingPendingMessagesCount = this.pendingMessages.length;

// === Phase 1: Pre-scan ===
// Scan pending messages to derive a deterministic batchId for any ID Allocation batch.
// This must happen before replay because the ID alloc batch will be flushed during replay
// and needs its batchId set on the Outbox beforehand.
this.deriveIdAllocationBatchId(initialPendingMessagesCount, committingStagedBatches);

let seenStagedBatch = false;

// Process exactly `pendingMessagesCount` items in the queue as it represents the number of messages that were
// pending when we connected. This is important because the `reSubmitFn` might add more items in the queue
// which must not be replayed.
// === Phase 2: Dequeue and replay ===
// Process exactly `initialPendingMessagesCount` items in the queue as it represents the
// number of messages that were pending when we connected. This is important because the
// `reSubmitFn` might add more items in the queue which must not be replayed.
// ID Allocation batches are skipped here — fresh ID alloc ops were already submitted
// by submitIdAllocationOpIfNeeded before replay started.
while (remainingPendingMessagesCount > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let pendingMessage = this.pendingMessages.shift()!;
Expand All @@ -801,12 +899,26 @@ export class PendingStateManager implements IDisposable {
const batchMetadataFlag = asBatchMetadata(pendingMessage.opMetadata)?.batch;
assert(batchMetadataFlag !== false, 0x41b /* We cannot process batches in chunks */);

// The next message starts a batch (possibly single-message), and we'll need its batchId.
const batchId =
pendingMessage.batchInfo.ignoreBatchId === true
? undefined
: getEffectiveBatchId(pendingMessage);
// Skip ID Allocation batches (handled by submitIdAllocationOpIfNeeded before replay).
if (
hasTypicalRuntimeOp(pendingMessage) &&
pendingMessage.runtimeOp.type === ContainerMessageType.IdAllocation
) {
// If it's a multi-message ID alloc batch, consume the rest of it
if (batchMetadataFlag === true) {
while (remainingPendingMessagesCount > 0) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const innerMsg = this.pendingMessages.shift()!;
remainingPendingMessagesCount--;
if (innerMsg.opMetadata?.batch === false) {
break;
}
}
}
continue;
}

const batchId = getEffectiveBatchId(pendingMessage);
const staged = pendingMessage.batchInfo.staged;

if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) {
Expand Down Expand Up @@ -945,3 +1057,32 @@ function hasTypicalRuntimeOp(
): message is IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage } {
return message.runtimeOp !== undefined && message.runtimeOp.type !== "groupedBatch";
}

/**
* Given the first message of a batch at `startIndex` in the deque, return the index of
* the first message of the *next* batch. Handles both single-message batches (no batch
* metadata flag, or flag === undefined) and multi-message batches (flag === true on the
* first message, flag === false on the last).
*/
function nextBatchIndex(
messages: Deque<IPendingMessage>,
startIndex: number,
limit: number,
): number {
const msg = messages.get(startIndex);
const batchFlag = asBatchMetadata(msg?.opMetadata)?.batch;
if (batchFlag !== true) {
// Single-message batch (or no batch metadata)
return startIndex + 1;
}
// Multi-message batch — scan forward to find the batch-end marker
let idx = startIndex + 1;
while (idx < limit) {
const inner = messages.get(idx);
idx++;
if (inner?.opMetadata?.batch === false) {
break;
}
}
return idx;
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,36 @@ describe("DuplicateBatchDetector", () => {
);
});

it("ID Allocation batches with derived batchIds are detected as duplicates", () => {
// Simulates two forked containers submitting ID allocation batches with the same derived batchId
const derivedIdAllocBatchId = "idAlloc_[clientId_[2]]";

const inboundBatch1 = makeBatch({
sequenceNumber: seqNum++,
minimumSequenceNumber: 0,
batchId: derivedIdAllocBatchId,
});
const inboundBatch2 = makeBatch({
sequenceNumber: seqNum++,
minimumSequenceNumber: 0,
batchId: derivedIdAllocBatchId,
});

const result1 = detector.processInboundBatch(inboundBatch1);
assert.deepEqual(
result1,
{ duplicate: false },
"First ID alloc batch should not be a duplicate",
);

const result2 = detector.processInboundBatch(inboundBatch2);
assert.deepEqual(
result2,
{ duplicate: true, otherSequenceNumber: 1 },
"Second ID alloc batch with same derived batchId should be detected as a duplicate",
);
});

describe("getStateForSummary", () => {
it("If empty, return undefined", () => {
assert.equal(
Expand Down
Loading
Loading