diff --git a/op-supernode/supernode/activity/interop/algo.go b/op-supernode/supernode/activity/interop/algo.go index f724c021ef5..f94386970e0 100644 --- a/op-supernode/supernode/activity/interop/algo.go +++ b/op-supernode/supernode/activity/interop/algo.go @@ -59,7 +59,7 @@ func (i *Interop) l1Inclusion(ts uint64, blocksAtTimestamp blockPerChain) (eth.B // - Verify the initiating message exists in the source chain's logsDB // - Verify the initiating message timestamp <= executing message timestamp // - Verify the initiating message hasn't expired (within message expiry window) -func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp blockPerChain) (Result, error) { +func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp blockPerChain, view *frontierVerificationView) (Result, error) { result := Result{ Timestamp: ts, L2Heads: make(blockPerChain), @@ -78,7 +78,7 @@ func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp blockPerCha execMsgs map[uint32]*types.ExecutingMessage err error ) - if frontierBlock, ok := i.frontierView.block(chainID); ok { + if frontierBlock, ok := view.block(chainID); ok { blockRef = frontierBlock.ref execMsgs = frontierBlock.execMsgs } else { @@ -141,7 +141,7 @@ func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp blockPerCha // Verify each executing message blockValid := true for logIdx, execMsg := range execMsgs { - err := i.verifyExecutingMessage(chainID, blockRef.Time, logIdx, execMsg) + err := i.verifyExecutingMessage(chainID, blockRef.Time, logIdx, execMsg, view) if err != nil { i.log.Warn("invalid executing message", "chain", chainID, @@ -172,7 +172,7 @@ func (i *Interop) verifyInteropMessages(ts uint64, blocksAtTimestamp blockPerCha // 1. The initiating message exists in the source chain's database // 2. The initiating message's timestamp is not greater than the executing block's timestamp // 3. The initiating message hasn't expired (timestamp + messageExpiryWindow >= executing timestamp) -func (i *Interop) verifyExecutingMessage(executingChain eth.ChainID, executingTimestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage) error { +func (i *Interop) verifyExecutingMessage(executingChain eth.ChainID, executingTimestamp uint64, logIdx uint32, execMsg *types.ExecutingMessage, view *frontierVerificationView) error { // Get the source chain's logsDB sourceDB, ok := i.logsDBs[execMsg.ChainID] if !ok { @@ -202,7 +202,7 @@ func (i *Interop) verifyExecutingMessage(executingChain eth.ChainID, executingTi // Same-timestamp dependencies may live in the current frontier view rather // than accepted-history logsDB. if execMsg.Timestamp == executingTimestamp { - if _, ok := i.frontierView.contains(execMsg.ChainID, query); ok { + if _, ok := view.contains(execMsg.ChainID, query); ok { return nil } } diff --git a/op-supernode/supernode/activity/interop/algo_test.go b/op-supernode/supernode/activity/interop/algo_test.go index 485838559cf..2e0cbbc1e1f 100644 --- a/op-supernode/supernode/activity/interop/algo_test.go +++ b/op-supernode/supernode/activity/interop/algo_test.go @@ -48,7 +48,7 @@ type verifyInteropTestCase struct { func runVerifyInteropTest(t *testing.T, tc verifyInteropTestCase) { t.Parallel() interop, timestamp, blocks := tc.setup() - result, err := interop.verifyInteropMessages(timestamp, blocks) + result, err := interop.verifyInteropMessages(timestamp, blocks, nil) if tc.expectError { require.Error(t, err) @@ -87,7 +87,7 @@ func TestL1Inclusion(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{}, - chains: map[eth.ChainID]cc.ChainContainer{chainID: &algoMockChain{id: chainID, optimisticL1: l1Block}}, + chains: map[eth.ChainID]cc.InteropChain{chainID: &algoMockChain{id: chainID, optimisticL1: l1Block}}, } return interop, 1000, map[eth.ChainID]eth.BlockID{chainID: expectedBlock} }, @@ -109,7 +109,7 @@ func TestL1Inclusion(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{}, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ chain1ID: &algoMockChain{id: chain1ID, optimisticL1: eth.BlockID{Number: 60, Hash: common.HexToHash("0xL1_1")}}, chain2ID: &algoMockChain{id: chain2ID, optimisticL1: eth.BlockID{Number: 45, Hash: common.HexToHash("0xL1_2")}}, chain3ID: &algoMockChain{id: chain3ID, optimisticL1: eth.BlockID{Number: 50, Hash: common.HexToHash("0xL1_3")}}, @@ -137,7 +137,7 @@ func TestL1Inclusion(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{}, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ chain1ID: &algoMockChain{id: chain1ID, optimisticL1: l1Block1}, // chain2ID NOT in chains map }, @@ -160,7 +160,7 @@ func TestL1Inclusion(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{}, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ chainID: &algoMockChain{id: chainID, optimisticAtErr: errors.New("optimistic at error")}, }, } @@ -178,7 +178,7 @@ func TestL1Inclusion(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{}, - chains: map[eth.ChainID]cc.ChainContainer{}, + chains: map[eth.ChainID]cc.InteropChain{}, } return interop, 1000, map[eth.ChainID]eth.BlockID{} }, @@ -197,7 +197,7 @@ func TestL1Inclusion(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{}, - chains: map[eth.ChainID]cc.ChainContainer{chainID: &algoMockChain{id: chainID, optimisticL1: l1Block}}, + chains: map[eth.ChainID]cc.InteropChain{chainID: &algoMockChain{id: chainID, optimisticL1: l1Block}}, } return interop, 0, map[eth.ChainID]eth.BlockID{ chainID: {Number: 0, Hash: common.HexToHash("0x123")}, @@ -254,7 +254,7 @@ func TestVerifyInteropMessages(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{chainID: mockDB}, - chains: map[eth.ChainID]cc.ChainContainer{chainID: newMockChainWithL1(chainID, l1Block)}, + chains: map[eth.ChainID]cc.InteropChain{chainID: newMockChainWithL1(chainID, l1Block)}, } return interop, 1000, map[eth.ChainID]eth.BlockID{chainID: expectedBlock} @@ -307,7 +307,7 @@ func TestVerifyInteropMessages(t *testing.T) { sourceChainID: sourceDB, destChainID: destDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ sourceChainID: newMockChainWithL1(sourceChainID, l1Block), destChainID: newMockChainWithL1(destChainID, l1Block), }, @@ -367,7 +367,7 @@ func TestVerifyInteropMessages(t *testing.T) { sourceChainID: sourceDB, destChainID: destDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ sourceChainID: newMockChainWithL1(sourceChainID, l1Block), destChainID: newMockChainWithL1(destChainID, l1Block), }, @@ -429,7 +429,7 @@ func TestVerifyInteropMessages(t *testing.T) { sourceChainID: sourceDB, destChainID: destDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ sourceChainID: newMockChainWithL1(sourceChainID, l1Block), destChainID: newMockChainWithL1(destChainID, l1Block), }, @@ -464,7 +464,7 @@ func TestVerifyInteropMessages(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{registeredChain: mockDB}, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ registeredChain: newMockChainWithL1(registeredChain, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), }, } @@ -502,7 +502,7 @@ func TestVerifyInteropMessages(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{chainID: mockDB}, - chains: map[eth.ChainID]cc.ChainContainer{chainID: newMockChainWithL1(chainID, l1Block, expectedBlock)}, + chains: map[eth.ChainID]cc.InteropChain{chainID: newMockChainWithL1(chainID, l1Block, expectedBlock)}, } return interop, 1000, map[eth.ChainID]eth.BlockID{chainID: expectedBlock} @@ -550,7 +550,7 @@ func TestVerifyInteropMessages(t *testing.T) { sourceChainID: sourceDB, destChainID: destDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ sourceChainID: newMockChainWithL1(sourceChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), destChainID: newMockChainWithL1(destChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}, destBlock), }, @@ -602,7 +602,7 @@ func TestVerifyInteropMessages(t *testing.T) { sourceChainID: sourceDB, destChainID: destDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ sourceChainID: newMockChainWithL1(sourceChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), destChainID: newMockChainWithL1(destChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}, destBlock), }, @@ -647,7 +647,7 @@ func TestVerifyInteropMessages(t *testing.T) { destChainID: destDB, // Note: unknownSourceChain NOT in logsDBs }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ unknownSourceChain: newMockChainWithL1(unknownSourceChain, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), destChainID: newMockChainWithL1(destChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}, destBlock), }, @@ -701,7 +701,7 @@ func TestVerifyInteropMessages(t *testing.T) { sourceChainID: sourceDB, destChainID: destDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ sourceChainID: newMockChainWithL1(sourceChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), destChainID: newMockChainWithL1(destChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}, destBlock), }, @@ -760,7 +760,7 @@ func TestVerifyInteropMessages(t *testing.T) { validChainID: validDB, invalidChainID: invalidDB, }, - chains: map[eth.ChainID]cc.ChainContainer{ + chains: map[eth.ChainID]cc.InteropChain{ invalidChainID: newMockChainWithL1(invalidChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}, invalidBlock), sourceChainID: newMockChainWithL1(sourceChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), validChainID: newMockChainWithL1(validChainID, eth.BlockID{Number: 40, Hash: common.HexToHash("0xL1")}), @@ -800,7 +800,7 @@ func TestVerifyInteropMessages(t *testing.T) { messageExpiryWindow: defaultMessageExpiryWindow, log: gethlog.New(), logsDBs: map[eth.ChainID]LogsDB{chainID: mockDB}, - chains: map[eth.ChainID]cc.ChainContainer{chainID: newMockChainWithL1(chainID, l1Block)}, + chains: map[eth.ChainID]cc.InteropChain{chainID: newMockChainWithL1(chainID, l1Block)}, } return interop, 1000, map[eth.ChainID]eth.BlockID{chainID: block} diff --git a/op-supernode/supernode/activity/interop/checker.go b/op-supernode/supernode/activity/interop/checker.go index 1e429b70d28..c3cce20d568 100644 --- a/op-supernode/supernode/activity/interop/checker.go +++ b/op-supernode/supernode/activity/interop/checker.go @@ -11,21 +11,27 @@ type l1ByNumberSource interface { L1BlockRefByNumber(ctx context.Context, num uint64) (eth.L1BlockRef, error) } -// byNumberConsistencyChecker verifies that a set of L1 block IDs all belong to -// the same L1 fork by comparing each against the canonical chain. -type byNumberConsistencyChecker struct { +// l1ConsistencyChecker verifies that a set of L1 block IDs all belong to +// the same L1 fork. +type l1ConsistencyChecker interface { + SameL1Chain(ctx context.Context, heads []eth.BlockID) (bool, error) +} + +// l1ByNumberChecker is the production l1ConsistencyChecker: it checks each +// head against the canonical L1 chain fetched from an l1ByNumberSource. +type l1ByNumberChecker struct { l1 l1ByNumberSource } -func newByNumberConsistencyChecker(l1 l1ByNumberSource) *byNumberConsistencyChecker { +func newL1ConsistencyChecker(l1 l1ByNumberSource) l1ConsistencyChecker { if l1 == nil { return nil } - return &byNumberConsistencyChecker{l1: l1} + return &l1ByNumberChecker{l1: l1} } // SameL1Chain returns true if all non-zero heads belong to the same canonical L1 chain. -func (c *byNumberConsistencyChecker) SameL1Chain(ctx context.Context, heads []eth.BlockID) (bool, error) { +func (c *l1ByNumberChecker) SameL1Chain(ctx context.Context, heads []eth.BlockID) (bool, error) { for _, head := range heads { if head == (eth.BlockID{}) { continue diff --git a/op-supernode/supernode/activity/interop/checker_test.go b/op-supernode/supernode/activity/interop/checker_test.go index 79e99614d0a..9d568c8674d 100644 --- a/op-supernode/supernode/activity/interop/checker_test.go +++ b/op-supernode/supernode/activity/interop/checker_test.go @@ -22,7 +22,23 @@ func (m *mockL1Source) L1BlockRefByNumber(ctx context.Context, num uint64) (eth. return ref, nil } -func TestByNumberConsistencyChecker_SameL1Chain(t *testing.T) { +// noopL1Checker treats every set of heads as canonical. Intended for tests that +// do not exercise L1 consistency — production must always use the real checker. +type noopL1Checker struct{} + +func (noopL1Checker) SameL1Chain(context.Context, []eth.BlockID) (bool, error) { + return true, nil +} + +// inconsistentL1Checker always reports that heads disagree on the canonical L1 +// chain. Tests use it to drive observeRound into a rewind decision. +type inconsistentL1Checker struct{} + +func (inconsistentL1Checker) SameL1Chain(context.Context, []eth.BlockID) (bool, error) { + return false, nil +} + +func TestL1ConsistencyChecker_SameL1Chain(t *testing.T) { t.Parallel() hashA := common.HexToHash("0xaaaa") @@ -34,7 +50,7 @@ func TestByNumberConsistencyChecker_SameL1Chain(t *testing.T) { 200: {Hash: hashB, Number: 200}, }, } - checker := newByNumberConsistencyChecker(source) + checker := newL1ConsistencyChecker(source) t.Run("all match canonical", func(t *testing.T) { same, err := checker.SameL1Chain(context.Background(), []eth.BlockID{ @@ -71,7 +87,7 @@ func TestByNumberConsistencyChecker_SameL1Chain(t *testing.T) { }) t.Run("nil checker returns nil", func(t *testing.T) { - nilChecker := newByNumberConsistencyChecker(nil) + nilChecker := newL1ConsistencyChecker(nil) require.Nil(t, nilChecker) }) } diff --git a/op-supernode/supernode/activity/interop/cycle.go b/op-supernode/supernode/activity/interop/cycle.go index 6d6551c8d1b..5570622a036 100644 --- a/op-supernode/supernode/activity/interop/cycle.go +++ b/op-supernode/supernode/activity/interop/cycle.go @@ -169,7 +169,7 @@ func buildCycleGraph(ts uint64, chainEMs map[eth.ChainID]map[uint32]*types.Execu // using Kahn's topological sort algorithm. // // Returns a Result with InvalidHeads populated for chains participating in cycles. -func (i *Interop) verifyCycleMessages(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) { +func (i *Interop) verifyCycleMessages(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID, view *frontierVerificationView) (Result, error) { result := Result{ Timestamp: ts, L2Heads: blocksAtTimestamp, @@ -178,7 +178,7 @@ func (i *Interop) verifyCycleMessages(ts uint64, blocksAtTimestamp map[eth.Chain // collect all EMs for the given blocks per chain chainEMs := make(map[eth.ChainID]map[uint32]*types.ExecutingMessage) for chainID, blockID := range blocksAtTimestamp { - if frontierBlock, ok := i.frontierView.block(chainID); ok { + if frontierBlock, ok := view.block(chainID); ok { if frontierBlock.ref.Time == ts { chainEMs[chainID] = frontierBlock.execMsgs } diff --git a/op-supernode/supernode/activity/interop/interop.go b/op-supernode/supernode/activity/interop/interop.go index 69782592680..15f2ad23bfe 100644 --- a/op-supernode/supernode/activity/interop/interop.go +++ b/op-supernode/supernode/activity/interop/interop.go @@ -2,6 +2,7 @@ package interop import ( "context" + "encoding/json" "errors" "fmt" "sort" @@ -75,6 +76,52 @@ const ( DecisionRewind ) +// Decision is serialized as a self-describing string in the WAL so that the +// on-disk format survives enum re-ordering or the insertion of new variants. +func (d Decision) String() string { + switch d { + case DecisionWait: + return "wait" + case DecisionAdvance: + return "advance" + case DecisionInvalidate: + return "invalidate" + case DecisionRewind: + return "rewind" + default: + return fmt.Sprintf("unknown(%d)", int(d)) + } +} + +func (d Decision) MarshalJSON() ([]byte, error) { + switch d { + case DecisionWait, DecisionAdvance, DecisionInvalidate, DecisionRewind: + return json.Marshal(d.String()) + default: + return nil, fmt.Errorf("marshal decision: unknown value %d", int(d)) + } +} + +func (d *Decision) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("unmarshal decision: expected string: %w", err) + } + switch s { + case "wait": + *d = DecisionWait + case "advance": + *d = DecisionAdvance + case "invalidate": + *d = DecisionInvalidate + case "rewind": + *d = DecisionRewind + default: + return fmt.Errorf("unmarshal decision: unknown value %q", s) + } + return nil +} + // StepOutput combines a decision with the verification result (if any). type StepOutput struct { Decision Decision @@ -84,7 +131,7 @@ type StepOutput struct { // Interop is a VerificationActivity that can also run background work as a RunnableActivity. type Interop struct { log log.Logger - chains map[eth.ChainID]cc.ChainContainer + chains map[eth.ChainID]cc.InteropChain activationTimestamp uint64 // immutable protocol activation timestamp // runtimeActivationTimestamp is the effective activation timestamp used @@ -108,12 +155,12 @@ type Interop struct { currentL1 eth.BlockID - verifyFn func(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) + verifyFn func(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID, view *frontierVerificationView) (Result, error) // cycleVerifyFn handles same-timestamp cycle verification. // It is called after verifyFn in progressInterop, and its results are merged. // Set to verifyCycleMessages by default in New(). - cycleVerifyFn func(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID) (Result, error) + cycleVerifyFn func(ts uint64, blocksAtTimestamp map[eth.ChainID]eth.BlockID, view *frontierVerificationView) (Result, error) // pauseAtTimestamp is used for integration test control only. // When non-zero, progressInterop will return early without processing @@ -127,8 +174,9 @@ type Interop struct { // because logBackfillDepth <= 0). Read by integration tests to gate on backfill finishing. backfillCompleted atomic.Bool - l1Checker *byNumberConsistencyChecker - frontierView *frontierVerificationView + // l1Checker must be non-nil whenever observeRound runs. Production sets it + // via New; tests inject noopL1Checker. + l1Checker l1ConsistencyChecker logBackfillDepth time.Duration } @@ -142,7 +190,7 @@ func New( log log.Logger, activationTimestamp uint64, messageExpiryWindow uint64, - chains map[eth.ChainID]cc.ChainContainer, + chains map[eth.ChainID]cc.InteropChain, dataDir string, l1Source l1ByNumberSource, logBackfillDepth time.Duration, @@ -187,7 +235,7 @@ func New( // (can be overridden by tests) i.verifyFn = i.verifyInteropMessages i.cycleVerifyFn = i.verifyCycleMessages - i.l1Checker = newByNumberConsistencyChecker(l1Source) + i.l1Checker = newL1ConsistencyChecker(l1Source) return i } @@ -395,21 +443,18 @@ func (i *Interop) observeRound() (RoundObservation, error) { obs.L1Heads = ready.l1Heads // Check that all frontier L1 heads AND the accepted L1 head are on the same canonical fork. - obs.L1Consistent = true - if i.l1Checker != nil { - heads := make([]eth.BlockID, 0, len(obs.L1Heads)+1) - if obs.LastVerified != nil { - heads = append(heads, obs.LastVerified.L1Inclusion) - } - for _, l1 := range obs.L1Heads { - heads = append(heads, l1) - } - same, err := i.l1Checker.SameL1Chain(i.ctx, heads) - if err != nil { - return obs, fmt.Errorf("L1 consistency check: %w", err) - } - obs.L1Consistent = same + heads := make([]eth.BlockID, 0, len(obs.L1Heads)+1) + if obs.LastVerified != nil { + heads = append(heads, obs.LastVerified.L1Inclusion) + } + for _, l1 := range obs.L1Heads { + heads = append(heads, l1) + } + same, err := i.l1Checker.SameL1Chain(i.ctx, heads) + if err != nil { + return obs, fmt.Errorf("L1 consistency check: %w", err) } + obs.L1Consistent = same return obs, nil } @@ -420,17 +465,13 @@ func (i *Interop) verify(ts uint64, blocksAtTS map[eth.ChainID]eth.BlockID) (Res if err != nil { return Result{}, fmt.Errorf("resolve frontier verification view: %w", err) } - i.frontierView = view - defer func() { - i.frontierView = nil - }() - result, err := i.verifyFn(ts, blocksAtTS) + result, err := i.verifyFn(ts, blocksAtTS, view) if err != nil { return Result{}, err } - cycleResult, err := i.cycleVerifyFn(ts, blocksAtTS) + cycleResult, err := i.cycleVerifyFn(ts, blocksAtTS, view) if err != nil { return Result{}, fmt.Errorf("cycle verification failed: %w", err) } diff --git a/op-supernode/supernode/activity/interop/interop_test.go b/op-supernode/supernode/activity/interop/interop_test.go index 5ab051c6462..6f597997bde 100644 --- a/op-supernode/supernode/activity/interop/interop_test.go +++ b/op-supernode/supernode/activity/interop/interop_test.go @@ -92,12 +92,13 @@ func (h *interopTestHarness) Build() *interopTestHarness { if h.skipBuild { return h } - chains := make(map[eth.ChainID]cc.ChainContainer) + chains := make(map[eth.ChainID]cc.InteropChain) for id, mock := range h.mocks { chains[id] = mock } h.interop = New(testLogger(), h.activationTime, 0, chains, h.dataDir, nil, h.logBackfillDepth) if h.interop != nil { + h.interop.l1Checker = noopL1Checker{} h.interop.ctx = context.Background() h.t.Cleanup(func() { _ = h.interop.Stop(context.Background()) }) } @@ -105,8 +106,8 @@ func (h *interopTestHarness) Build() *interopTestHarness { } // Chains returns the map of chain containers for use with New(). -func (h *interopTestHarness) Chains() map[eth.ChainID]cc.ChainContainer { - chains := make(map[eth.ChainID]cc.ChainContainer) +func (h *interopTestHarness) Chains() map[eth.ChainID]cc.InteropChain { + chains := make(map[eth.ChainID]cc.InteropChain) for id, mock := range h.mocks { chains[id] = mock } @@ -138,6 +139,7 @@ func TestNew(t *testing.T) { run: func(t *testing.T, h *interopTestHarness) { interop := New(testLogger(), h.activationTime, 0, h.Chains(), h.dataDir, nil, 0) require.NotNil(t, interop) + interop.l1Checker = noopL1Checker{} t.Cleanup(func() { _ = interop.Stop(context.Background()) }) require.Equal(t, uint64(1000), interop.activationTimestamp) @@ -465,14 +467,14 @@ func TestProgressInterop(t *testing.T) { t.Parallel() // Default verifyFn that passes through - passThroughVerifyFn := func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + passThroughVerifyFn := func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L1Inclusion: eth.BlockID{Number: 100}, L2Heads: blocks}, nil } tests := []struct { name string setup func(h *interopTestHarness) *interopTestHarness - verifyFn func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) + verifyFn func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) assert func(t *testing.T, result Result, err error) run func(t *testing.T, h *interopTestHarness) // override for complex cases }{ @@ -546,7 +548,7 @@ func TestProgressInterop(t *testing.T) { m.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} }).Build() }, - verifyFn: func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + verifyFn: func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{}, errors.New("verification failed") }, assert: func(t *testing.T, result Result, err error) { @@ -595,7 +597,7 @@ func TestProgressInteropWithCycleVerify(t *testing.T) { }, run: func(t *testing.T, h *interopTestHarness) { // Set verifyFn to return a valid result - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L2Heads: blocks}, nil } // cycleVerifyFn is overridden with this stub implementation. @@ -622,13 +624,13 @@ func TestProgressInteropWithCycleVerify(t *testing.T) { chain8453 := eth.ChainIDFromUInt64(8453) // verifyFn returns valid result - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { verifyFnCalled = true return Result{Timestamp: ts, L2Heads: blocks}, nil } // cycleVerifyFn marks chain 8453 as invalid - h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { require.True(t, verifyFnCalled, "verifyFn should be called before cycleVerifyFn") cycleVerifyFnCalled = true return Result{ @@ -657,10 +659,10 @@ func TestProgressInteropWithCycleVerify(t *testing.T) { }).Build() }, run: func(t *testing.T, h *interopTestHarness) { - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L2Heads: blocks}, nil } - h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{}, errors.New("cycle verification failed") } @@ -684,7 +686,7 @@ func TestProgressInteropWithCycleVerify(t *testing.T) { chain8453 := eth.ChainIDFromUInt64(8453) // verifyFn marks chain 10 as invalid - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{ Timestamp: ts, L2Heads: blocks, @@ -695,7 +697,7 @@ func TestProgressInteropWithCycleVerify(t *testing.T) { } // cycleVerifyFn marks chain 8453 as invalid - h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{ Timestamp: ts, L2Heads: blocks, @@ -774,7 +776,7 @@ func TestVerifiedAtTimestamp(t *testing.T) { }).Build() }, run: func(t *testing.T, h *interopTestHarness) { - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L1Inclusion: eth.BlockID{Number: 100}, L2Heads: blocks}, nil } @@ -1043,7 +1045,7 @@ func TestProgressAndRecord(t *testing.T) { }, run: func(t *testing.T, h *interopTestHarness) { expectedL1Inclusion := eth.BlockID{Number: 150, Hash: common.HexToHash("0xL1Result")} - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L1Inclusion: expectedL1Inclusion, L2Heads: blocks}, nil } @@ -1069,7 +1071,7 @@ func TestProgressAndRecord(t *testing.T) { run: func(t *testing.T, h *interopTestHarness) { // L1Inclusion is 1000 (from the leading chain) but chain 8453 is only at 990. // interop.currentL1 must be capped at 990 so it never exceeds any node's CurrentL1. - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{ Timestamp: ts, L1Inclusion: eth.BlockID{Number: 1000, Hash: common.HexToHash("0xleading")}, @@ -1098,7 +1100,7 @@ func TestProgressAndRecord(t *testing.T) { initialL1 := eth.BlockID{Number: 50, Hash: common.HexToHash("0x50")} h.interop.currentL1 = initialL1 - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{ Timestamp: ts, L1Inclusion: eth.BlockID{Number: 999, Hash: common.HexToHash("0xShouldNotBeUsed")}, @@ -1151,9 +1153,10 @@ func TestInterop_FullCycle(t *testing.T) { mock.currentL1 = eth.BlockRef{Number: 1000, Hash: common.HexToHash("0xL1")} mock.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} - chains := map[eth.ChainID]cc.ChainContainer{mock.id: mock} + chains := map[eth.ChainID]cc.InteropChain{mock.id: mock} interop := New(testLogger(), 100, 0, chains, dataDir, nil, 0) require.NotNil(t, interop) + interop.l1Checker = noopL1Checker{} interop.ctx = context.Background() // Verify logsDB is empty initially @@ -1161,7 +1164,7 @@ func TestInterop_FullCycle(t *testing.T) { require.False(t, hasBlocks) // Stub verifyFn - interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L1Inclusion: eth.BlockID{Number: 100}, L2Heads: blocks}, nil } @@ -1198,6 +1201,111 @@ func TestInterop_FullCycle(t *testing.T) { require.Equal(t, uint64(102), latestBlock.Number) } +// ============================================================================= +// TestInterop_ProgressAndRecord_MultiAdvance +// ============================================================================= + +// TestInterop_ProgressAndRecord_MultiAdvance drives several advance cycles end +// to end through progressAndRecord, exercising the real WAL write → apply → +// clear path that the compat shims in TestInterop_FullCycle bypass. +func TestInterop_ProgressAndRecord_MultiAdvance(t *testing.T) { + h := newInteropTestHarness(t). // newInteropTestHarness calls t.Parallel() + WithActivation(100). + WithChain(10, func(m *mockChainContainer) { + m.currentL1 = eth.BlockRef{Number: 1000, Hash: common.HexToHash("0xL1")} + m.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} + }). + Build() + + mock := h.Mock(10) + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { + return Result{Timestamp: ts, L1Inclusion: eth.BlockID{Number: 100}, L2Heads: blocks}, nil + } + h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { + return Result{}, nil + } + + const cycles = 3 + for i := 0; i < cycles; i++ { + made, err := h.interop.progressAndRecord() + require.NoError(t, err) + require.True(t, made, "cycle %d should advance", i) + + pending, err := h.interop.verifiedDB.GetPendingTransition() + require.NoError(t, err) + require.Nil(t, pending, "pending transition should be cleared after cycle %d", i) + } + + lastTS, ok := h.interop.verifiedDB.LastTimestamp() + require.True(t, ok) + require.Equal(t, uint64(100+cycles-1), lastTS) + + for ts := uint64(100); ts <= lastTS; ts++ { + result, err := h.interop.verifiedDB.Get(ts) + require.NoError(t, err) + require.Equal(t, ts, result.Timestamp) + require.Contains(t, result.L2Heads, mock.id) + } + + latestBlock, hasBlocks := h.interop.logsDBs[mock.id].LatestSealedBlock() + require.True(t, hasBlocks) + require.Equal(t, lastTS, latestBlock.Number) +} + +// ============================================================================= +// TestInterop_ProgressAndRecord_L1InconsistencyTriggersRewind +// ============================================================================= + +// TestInterop_ProgressAndRecord_L1InconsistencyTriggersRewind advances twice +// through progressAndRecord, flips the l1Checker so observeRound sees an +// inconsistency, and asserts the next progressAndRecord drives a full rewind +// end to end: verifiedDB trimmed, engines reset, WAL cleared. +func TestInterop_ProgressAndRecord_L1InconsistencyTriggersRewind(t *testing.T) { + h := newInteropTestHarness(t). // newInteropTestHarness calls t.Parallel() + WithActivation(100). + WithChain(10, func(m *mockChainContainer) { + m.currentL1 = eth.BlockRef{Number: 1000, Hash: common.HexToHash("0xL1")} + m.blockAtTimestamp = eth.L2BlockRef{Number: 500, Hash: common.HexToHash("0xL2")} + }). + Build() + + mock := h.Mock(10) + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { + return Result{Timestamp: ts, L1Inclusion: eth.BlockID{Number: 100}, L2Heads: blocks}, nil + } + h.interop.cycleVerifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { + return Result{}, nil + } + + for i := 0; i < 2; i++ { + _, err := h.interop.progressAndRecord() + require.NoError(t, err) + } + lastTS, ok := h.interop.verifiedDB.LastTimestamp() + require.True(t, ok) + require.Equal(t, uint64(101), lastTS) + + // Flip the L1 checker so observeRound returns L1Consistent=false, which + // drives checkPreconditions into DecisionRewind. + h.interop.l1Checker = inconsistentL1Checker{} + + made, err := h.interop.progressAndRecord() + require.NoError(t, err) + require.False(t, made, "rewind does not advance the verified timestamp") + + // Rewind removed the last-committed entry and left activation in place. + lastTS, ok = h.interop.verifiedDB.LastTimestamp() + require.True(t, ok) + require.Equal(t, uint64(100), lastTS) + + pending, err := h.interop.verifiedDB.GetPendingTransition() + require.NoError(t, err) + require.Nil(t, pending, "WAL cleared after successful rewind") + + require.Len(t, mock.rewindEngineCalls, 1, "engine rewound exactly once on the recovering chain") + require.Equal(t, uint64(100), mock.rewindEngineCalls[0]) +} + // ============================================================================= // TestResult_IsEmpty // ============================================================================= @@ -1677,6 +1785,74 @@ func TestPendingTransition_RecoverRewindPreservedOnFailure(t *testing.T) { require.Equal(t, uint64(1000), mock.rewindEngineCalls[0]) } +// TestPendingTransition_RewindReplaysAfterFailure exercises the full recovery +// loop for a rewind that fails mid-way: the WAL is preserved (existing +// behavior), the failing chain is then "fixed", and progressAndRecord picks up +// the pending transition on the next iteration and drives it to completion. +func TestPendingTransition_RewindReplaysAfterFailure(t *testing.T) { + h := newInteropTestHarness(t). // newInteropTestHarness calls t.Parallel() + WithChain(10, nil). + WithChain(8453, func(m *mockChainContainer) { + m.rewindEngineErr = errors.New("chain B rewind failed") + }). + Build() + + mockA := h.Mock(10) + mockB := h.Mock(8453) + + require.NoError(t, h.interop.verifiedDB.Commit(VerifiedResult{ + Timestamp: 1000, + L1Inclusion: eth.BlockID{Number: 50, Hash: common.HexToHash("0xL1a")}, + L2Heads: map[eth.ChainID]eth.BlockID{ + mockA.id: {Number: 100, Hash: common.HexToHash("0xa1")}, + mockB.id: {Number: 200, Hash: common.HexToHash("0xb1")}, + }, + })) + require.NoError(t, h.interop.verifiedDB.Commit(VerifiedResult{ + Timestamp: 1001, + L1Inclusion: eth.BlockID{Number: 51, Hash: common.HexToHash("0xL1b")}, + L2Heads: map[eth.ChainID]eth.BlockID{ + mockA.id: {Number: 101, Hash: common.HexToHash("0xa2")}, + mockB.id: {Number: 201, Hash: common.HexToHash("0xb2")}, + }, + })) + + lastTS := uint64(1001) + pending, err := h.interop.buildPendingTransition( + StepOutput{Decision: DecisionRewind}, + RoundObservation{LastVerifiedTS: &lastTS}, + ) + require.NoError(t, err) + require.NoError(t, h.interop.verifiedDB.SetPendingTransition(pending)) + + // First attempt: chain B's RewindEngine errors, WAL preserved. + _, err = h.interop.applyPendingTransition(pending) + require.EqualError(t, err, "apply rewind plan: chain 8453: reset chain engine on rewind: chain B rewind failed") + + stored, err := h.interop.verifiedDB.GetPendingTransition() + require.NoError(t, err) + require.NotNil(t, stored, "rewind transition should be preserved after partial failure") + + // Fix chain B. + mockB.mu.Lock() + mockB.rewindEngineErr = nil + mockB.mu.Unlock() + + // Second attempt: progressAndRecord discovers the WAL entry and re-applies + // it end-to-end via the normal recovery path. + _, err = h.interop.progressAndRecord() + require.NoError(t, err) + + replayed, err := h.interop.verifiedDB.GetPendingTransition() + require.NoError(t, err) + require.Nil(t, replayed, "replay should clear the pending transition") + + require.Len(t, mockA.rewindEngineCalls, 2, "chain A engine rewound on both attempts") + require.Len(t, mockB.rewindEngineCalls, 2, "chain B engine rewound on both attempts") + require.Equal(t, uint64(1000), mockA.rewindEngineCalls[1]) + require.Equal(t, uint64(1000), mockB.rewindEngineCalls[1]) +} + func TestPendingTransition_RecoverRewindReportsAllFailures(t *testing.T) { h := newInteropTestHarness(t). // newInteropTestHarness calls t.Parallel() WithChain(10, func(m *mockChainContainer) { @@ -1773,7 +1949,7 @@ func TestL1CanonicalityCheckErrorPropagates(t *testing.T) { require.NoError(t, err) // Set up a failing L1 checker using a mock that returns errors for all lookups - h.interop.l1Checker = newByNumberConsistencyChecker(&errorL1Source{ + h.interop.l1Checker = newL1ConsistencyChecker(&errorL1Source{ err: errors.New("L1 RPC unavailable"), }) @@ -1799,7 +1975,7 @@ func TestRewindAccepted(t *testing.T) { chainID := mock.id // Stub verifyFn - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { return Result{Timestamp: ts, L1Inclusion: eth.BlockID{Number: ts}, L2Heads: blocks}, nil } diff --git a/op-supernode/supernode/activity/interop/log_backfill_test.go b/op-supernode/supernode/activity/interop/log_backfill_test.go index 70870621488..ba4abd15346 100644 --- a/op-supernode/supernode/activity/interop/log_backfill_test.go +++ b/op-supernode/supernode/activity/interop/log_backfill_test.go @@ -431,7 +431,7 @@ func TestLogBackfill_AdvancesActivationAndStartsVerifyAfterCeiling(t *testing.T) var verifyCalls atomic.Int32 var firstVerifyTS atomic.Uint64 - h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID) (Result, error) { + h.interop.verifyFn = func(ts uint64, blocks map[eth.ChainID]eth.BlockID, _ *frontierVerificationView) (Result, error) { if verifyCalls.Add(1) == 1 { firstVerifyTS.Store(ts) } diff --git a/op-supernode/supernode/activity/interop/types_test.go b/op-supernode/supernode/activity/interop/types_test.go index 52398ba5d9e..2e5b40cf534 100644 --- a/op-supernode/supernode/activity/interop/types_test.go +++ b/op-supernode/supernode/activity/interop/types_test.go @@ -199,3 +199,48 @@ func TestPendingInvalidation_JSONRoundTrip(t *testing.T) { require.Equal(t, original.StateRoot, decoded.StateRoot) require.Equal(t, original.MessagePasserStorageRoot, decoded.MessagePasserStorageRoot) } + +func TestDecision_JSONRoundTrip(t *testing.T) { + t.Parallel() + + cases := []struct { + decision Decision + wantJSON string + }{ + {DecisionWait, `"wait"`}, + {DecisionAdvance, `"advance"`}, + {DecisionInvalidate, `"invalidate"`}, + {DecisionRewind, `"rewind"`}, + } + + for _, tc := range cases { + t.Run(tc.decision.String(), func(t *testing.T) { + data, err := json.Marshal(tc.decision) + require.NoError(t, err) + require.JSONEq(t, tc.wantJSON, string(data)) + + var decoded Decision + require.NoError(t, json.Unmarshal(data, &decoded)) + require.Equal(t, tc.decision, decoded) + }) + } +} + +func TestDecision_JSONRejectsInvalid(t *testing.T) { + t.Parallel() + + t.Run("marshal unknown value", func(t *testing.T) { + _, err := json.Marshal(Decision(99)) + require.Error(t, err) + }) + + t.Run("unmarshal unknown string", func(t *testing.T) { + var d Decision + require.Error(t, json.Unmarshal([]byte(`"nope"`), &d)) + }) + + t.Run("unmarshal non-string", func(t *testing.T) { + var d Decision + require.Error(t, json.Unmarshal([]byte(`2`), &d)) + }) +} diff --git a/op-supernode/supernode/activity/interop/verified_db.go b/op-supernode/supernode/activity/interop/verified_db.go index b675bb67644..cbc746128ca 100644 --- a/op-supernode/supernode/activity/interop/verified_db.go +++ b/op-supernode/supernode/activity/interop/verified_db.go @@ -1,12 +1,12 @@ package interop import ( - "bytes" "encoding/binary" "encoding/json" "errors" "fmt" "path/filepath" + "reflect" "sync" "github.com/ethereum-optimism/optimism/op-service/eth" @@ -119,32 +119,29 @@ func (v *VerifiedDB) Commit(result VerifiedResult) error { ts := result.Timestamp - // Serialize the result up front so replay of an already-applied transition can - // be treated as success when the stored value is identical. - value, err := json.Marshal(result) - if err != nil { - return fmt.Errorf("failed to marshal verified result: %w", err) - } - // Check for sequential commitment if v.initialized { if ts != v.lastTimestamp+1 { if ts <= v.lastTimestamp { + // Idempotent replay: crash recovery may call Commit again after the + // bbolt write succeeded but before ClearPendingTransition. Compare the + // deserialized VerifiedResult rather than raw bytes so byte-level + // drift in encoding/json across Go versions does not turn a legitimate + // replay into a hard ErrAlreadyCommitted. key := timestampToKey(ts) - var existing []byte + var existing VerifiedResult err := v.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(bucketName) val := b.Get(key) if val == nil { return ErrNotFound } - existing = append(existing[:0], val...) - return nil + return json.Unmarshal(val, &existing) }) if err != nil { return fmt.Errorf("failed to read existing verified result at %d: %w", ts, err) } - if bytes.Equal(existing, value) { + if reflect.DeepEqual(existing, result) { return nil } return fmt.Errorf("%w: %d", ErrAlreadyCommitted, ts) @@ -153,6 +150,11 @@ func (v *VerifiedDB) Commit(result VerifiedResult) error { } } + value, err := json.Marshal(result) + if err != nil { + return fmt.Errorf("failed to marshal verified result: %w", err) + } + // Store in database key := timestampToKey(ts) err = v.db.Update(func(tx *bolt.Tx) error { diff --git a/op-supernode/supernode/activity/interop/verified_db_test.go b/op-supernode/supernode/activity/interop/verified_db_test.go index 46551b0aa33..d418c132b5c 100644 --- a/op-supernode/supernode/activity/interop/verified_db_test.go +++ b/op-supernode/supernode/activity/interop/verified_db_test.go @@ -1,11 +1,13 @@ package interop import ( + "encoding/json" "testing" "github.com/ethereum-optimism/optimism/op-service/eth" "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" + bolt "go.etcd.io/bbolt" ) func TestVerifiedDB_WriteAndRead(t *testing.T) { @@ -333,6 +335,67 @@ func TestVerifiedDB_RewindTo(t *testing.T) { }) } +func TestVerifiedDB_Commit_IdempotentReplay(t *testing.T) { + t.Parallel() + + chainID := eth.ChainIDFromUInt64(10) + result := VerifiedResult{ + Timestamp: 100, + L1Inclusion: eth.BlockID{Hash: common.HexToHash("0x01"), Number: 1}, + L2Heads: map[eth.ChainID]eth.BlockID{ + chainID: {Hash: common.HexToHash("0x02"), Number: 2}, + }, + } + + t.Run("same struct replays cleanly", func(t *testing.T) { + t.Parallel() + db, err := OpenVerifiedDB(t.TempDir()) + require.NoError(t, err) + defer db.Close() + + require.NoError(t, db.Commit(result)) + require.NoError(t, db.Commit(result)) + }) + + t.Run("different struct at same timestamp is rejected", func(t *testing.T) { + t.Parallel() + db, err := OpenVerifiedDB(t.TempDir()) + require.NoError(t, err) + defer db.Close() + + require.NoError(t, db.Commit(result)) + + different := result + different.L1Inclusion.Number = 999 + require.ErrorIs(t, db.Commit(different), ErrAlreadyCommitted) + }) + + t.Run("byte-divergent but struct-equal replays cleanly", func(t *testing.T) { + t.Parallel() + // Simulate the concern raised in review: after a crash, the stored JSON + // bytes may differ from a fresh Marshal of the same struct (e.g. Go + // version upgrade changed field layout). Commit must still recognise + // this as an idempotent replay, not ErrAlreadyCommitted. + db, err := OpenVerifiedDB(t.TempDir()) + require.NoError(t, err) + defer db.Close() + + require.NoError(t, db.Commit(result)) + + indented, err := json.MarshalIndent(result, "", " ") + require.NoError(t, err) + canonical, err := json.Marshal(result) + require.NoError(t, err) + require.NotEqual(t, canonical, indented, "indented JSON should diverge from canonical bytes") + + require.NoError(t, db.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(bucketName).Put(timestampToKey(result.Timestamp), indented) + })) + + require.NoError(t, db.Commit(result)) + }) +} + func TestVerifiedDB_PendingTransition(t *testing.T) { t.Parallel() diff --git a/op-supernode/supernode/activity/supernode/supernode_test.go b/op-supernode/supernode/activity/supernode/supernode_test.go index 4d9df7cfbfd..2ee81c87893 100644 --- a/op-supernode/supernode/activity/supernode/supernode_test.go +++ b/op-supernode/supernode/activity/supernode/supernode_test.go @@ -79,10 +79,6 @@ func (m *mockCC) OptimisticOutputAtTimestamp(ctx context.Context, ts uint64) (*e return ð.OutputV0{}, nil } -func (m *mockCC) RewindEngine(ctx context.Context, timestamp uint64, invalidatedBlock eth.BlockRef) error { - return nil -} - func (m *mockCC) L1ForL2(ctx context.Context, l2Block eth.BlockID) (eth.BlockID, error) { return eth.BlockID{}, nil } @@ -97,9 +93,6 @@ func (m *mockCC) ID() eth.ChainID { func (m *mockCC) BlockTime() uint64 { return 1 } -func (m *mockCC) InvalidateBlock(ctx context.Context, height uint64, payloadHash common.Hash, decisionTimestamp uint64, stateRoot, messagePasserStorageRoot eth.Bytes32) (bool, error) { - return false, nil -} func (m *mockCC) OutputV0AtBlockNumber(ctx context.Context, l2BlockNum uint64) (*eth.OutputV0, error) { return ð.OutputV0{}, nil } diff --git a/op-supernode/supernode/activity/superroot/superroot_test.go b/op-supernode/supernode/activity/superroot/superroot_test.go index 5bad8c58902..5e2d6c72d31 100644 --- a/op-supernode/supernode/activity/superroot/superroot_test.go +++ b/op-supernode/supernode/activity/superroot/superroot_test.go @@ -84,10 +84,6 @@ func (m *mockCC) OptimisticOutputAtTimestamp(ctx context.Context, ts uint64) (*e } return ð.OutputV0{}, nil } -func (m *mockCC) RewindEngine(ctx context.Context, timestamp uint64, invalidatedBlock eth.BlockRef) error { - return nil -} - func (m *mockCC) L1ForL2(ctx context.Context, l2Block eth.BlockID) (eth.BlockID, error) { return eth.BlockID{}, nil } @@ -101,9 +97,6 @@ func (m *mockCC) ID() eth.ChainID { } func (m *mockCC) BlockTime() uint64 { return 1 } -func (m *mockCC) InvalidateBlock(ctx context.Context, height uint64, payloadHash common.Hash, decisionTimestamp uint64, stateRoot, messagePasserStorageRoot eth.Bytes32) (bool, error) { - return false, nil -} func (m *mockCC) OutputV0AtBlockNumber(ctx context.Context, l2BlockNum uint64) (*eth.OutputV0, error) { return ð.OutputV0{}, nil } diff --git a/op-supernode/supernode/chain_container/chain_container.go b/op-supernode/supernode/chain_container/chain_container.go index 1b60980e058..f3642f0238d 100644 --- a/op-supernode/supernode/chain_container/chain_container.go +++ b/op-supernode/supernode/chain_container/chain_container.go @@ -45,14 +45,6 @@ type ChainContainer interface { OptimisticAt(ctx context.Context, ts uint64) (l2, l1 eth.BlockID, err error) OutputRootAtL2BlockNumber(ctx context.Context, l2BlockNum uint64) (eth.Bytes32, error) OptimisticOutputAtTimestamp(ctx context.Context, ts uint64) (*eth.OutputV0, error) - // RewindEngine rewinds the engine to the highest block with timestamp less than or equal to the given timestamp. - // invalidatedBlock is the block that triggered the rewind and is passed to reset callbacks. - // WARNING: this is a dangerous stateful operation and is intended to be called only - // by interop transition application. Other callers should not use it until the - // interface is refactored to make that ownership explicit. - // TODO(#19561): remove this footgun by moving reorg-triggering operations behind a - // smaller interop-owned interface. - RewindEngine(ctx context.Context, timestamp uint64, invalidatedBlock eth.BlockRef) error RegisterVerifier(v activity.VerificationActivity) // VerifierCurrentL1s returns the CurrentL1 from each registered verifier. // This allows callers to determine the minimum L1 block that all verifiers have processed. @@ -62,16 +54,6 @@ type ChainContainer interface { FetchReceipts(ctx context.Context, blockHash eth.BlockID) (eth.BlockInfo, types.Receipts, error) // BlockTime returns the block time in seconds for this chain. BlockTime() uint64 - // InvalidateBlock adds a block to the deny list and triggers a rewind if the chain - // currently uses that block at the specified height. - // output is the marshaled eth.Output preimage for optimistic root computation. - // WARNING: this is a dangerous stateful operation and is intended to be called only - // by interop transition application. Other callers should not use it until the - // interface is refactored to make that ownership explicit. - // TODO(#19561): remove this footgun by moving reorg-triggering operations behind a - // smaller interop-owned interface. - // Returns true if a rewind was triggered, false otherwise. - InvalidateBlock(ctx context.Context, height uint64, payloadHash common.Hash, decisionTimestamp uint64, stateRoot, messagePasserStorageRoot eth.Bytes32) (bool, error) // PruneDeniedAtOrAfterTimestamp removes deny-list entries with DecisionTimestamp >= timestamp. // Returns map of removed hashes by height. PruneDeniedAtOrAfterTimestamp(timestamp uint64) (map[uint64][]common.Hash, error) @@ -91,6 +73,23 @@ type ChainContainer interface { SetResetCallback(cb ResetCallback) } +// WARNING: InteropChain exposes the reorg-triggering operations (RewindEngine, +// InvalidateBlock) that bypass the interop WAL model when invoked outside of +// interop transition application. ONLY the interop activity should accept or +// hold a value of this interface. Every other caller must take the narrower +// ChainContainer above so the misuse is caught at compile time. +type InteropChain interface { + ChainContainer + // RewindEngine rewinds the engine to the highest block with timestamp less than + // or equal to the given timestamp. invalidatedBlock is the block that triggered + // the rewind and is passed to reset callbacks. + RewindEngine(ctx context.Context, timestamp uint64, invalidatedBlock eth.BlockRef) error + // InvalidateBlock adds a block to the deny list and triggers a rewind if the + // chain currently uses that block at the specified height. Returns true if a + // rewind was triggered, false otherwise. + InvalidateBlock(ctx context.Context, height uint64, payloadHash common.Hash, decisionTimestamp uint64, stateRoot, messagePasserStorageRoot eth.Bytes32) (bool, error) +} + type virtualNodeFactory func(cfg *opnodecfg.Config, log gethlog.Logger, initOverrides *rollupNode.InitializationOverrides, appVersion string, superAuthority rollup.SuperAuthority) virtual_node.VirtualNode // ResetCallback is called when the chain container resets due to an invalidated block. @@ -129,6 +128,7 @@ type simpleChainContainer struct { // Interface conformance assertions var _ ChainContainer = (*simpleChainContainer)(nil) +var _ InteropChain = (*simpleChainContainer)(nil) var _ rollup.SuperAuthority = (*simpleChainContainer)(nil) func NewChainContainer( @@ -140,7 +140,7 @@ func NewChainContainer( rpcHandler *oprpc.Handler, setHandler func(chainID string, h http.Handler), addMetricsRegistry func(key string, g prometheus.Gatherer), -) ChainContainer { +) InteropChain { c := &simpleChainContainer{ vncfg: vncfg, cfg: cfg, @@ -562,10 +562,8 @@ func isCriticalRewindError(err error) bool { errors.Is(err, engine_controller.ErrRewindOverFinalizedHead) } -// WARNING: this should only be called by the interop activity. -// Other callers risk triggering chain rewinds outside the interop WAL model. -// TODO(#19561): remove this footgun by moving reorg-triggering operations behind a -// smaller interop-owned interface. +// RewindEngine is part of the InteropChain interface — callers must hold that +// wider interface (only interop transition application does) to invoke it. func (c *simpleChainContainer) RewindEngine(ctx context.Context, timestamp uint64, invalidatedBlock eth.BlockRef) error { if !c.resetting.CompareAndSwap(false, true) { return fmt.Errorf("reset already in progress") diff --git a/op-supernode/supernode/chain_container/invalidation.go b/op-supernode/supernode/chain_container/invalidation.go index 175cfd0ed6f..f4f165c24cb 100644 --- a/op-supernode/supernode/chain_container/invalidation.go +++ b/op-supernode/supernode/chain_container/invalidation.go @@ -331,12 +331,10 @@ func (d *DenyList) Close() error { return d.db.Close() } -// InvalidateBlock adds a block to the deny list and triggers a rewind if the chain -// currently uses that block at the specified height. -// WARNING: this should only be called by interop transition application. -// Other callers risk triggering chain rewinds outside the interop WAL model. -// TODO(#19561): remove this footgun by moving reorg-triggering operations behind a -// smaller interop-owned interface. +// InvalidateBlock is part of the InteropChain interface — callers must hold +// that wider interface (only interop transition application does) to invoke it. +// Adds a block to the deny list and triggers a rewind if the chain currently +// uses that block at the specified height. // Returns true if a rewind was triggered, false otherwise. // Note: Genesis block (height=0) cannot be invalidated as there is no prior block to rewind to. func (c *simpleChainContainer) InvalidateBlock(ctx context.Context, height uint64, payloadHash common.Hash, decisionTimestamp uint64, stateRoot, messagePasserStorageRoot eth.Bytes32) (bool, error) { diff --git a/op-supernode/supernode/supernode.go b/op-supernode/supernode/supernode.go index 4305b7fc58f..b5736ea2f06 100644 --- a/op-supernode/supernode/supernode.go +++ b/op-supernode/supernode/supernode.go @@ -35,7 +35,7 @@ type Supernode struct { requestStop context.CancelCauseFunc stopped bool cfg *config.CLIConfig - chains map[eth.ChainID]cc.ChainContainer + chains map[eth.ChainID]cc.InteropChain // activitiesMu guards reads and writes of the activities slice. Concurrent // readers (onChainReset, InteropActivity, Stop) can race with the // test-only RestartInteropActivity path that swaps the interop activity @@ -66,7 +66,7 @@ type Supernode struct { } func New(ctx context.Context, log gethlog.Logger, version string, requestStop context.CancelCauseFunc, cfg *config.CLIConfig, vnCfgs map[eth.ChainID]*opnodecfg.Config) (*Supernode, error) { - s := &Supernode{log: log, version: version, requestStop: requestStop, cfg: cfg, chains: make(map[eth.ChainID]cc.ChainContainer)} + s := &Supernode{log: log, version: version, requestStop: requestStop, cfg: cfg, chains: make(map[eth.ChainID]cc.InteropChain)} // Initialize L1 client if err := s.initL1Client(ctx, cfg); err != nil { @@ -102,11 +102,18 @@ func New(ctx context.Context, log gethlog.Logger, version string, requestStop co s.chains[chainID] = container } + // Narrow the chain map to ChainContainer for activities that must not invoke + // reorg-triggering operations. Only interop gets the wider InteropChain view. + narrowChains := make(map[eth.ChainID]cc.ChainContainer, len(s.chains)) + for id, c := range s.chains { + narrowChains[id] = c + } + // Initialize fixed activities s.activities = []activity.Activity{ heartbeat.New(log.New("activity", "heartbeat"), 10*time.Second), - supernodeactivity.New(log.New("activity", "supernode"), s.chains), - superroot.New(log.New("activity", "superroot"), s.chains), + supernodeactivity.New(log.New("activity", "supernode"), narrowChains), + superroot.New(log.New("activity", "superroot"), narrowChains), } interopActivationTimestamp, err := resolveInteropActivationTimestamp(cfg.InteropActivationTimestamp, vnCfgs) @@ -283,7 +290,7 @@ func (s *Supernode) Start(ctx context.Context) error { } for chainID, chain := range s.chains { s.wg.Add(1) - go func(chainID eth.ChainID, chain cc.ChainContainer) { + go func(chainID eth.ChainID, chain cc.InteropChain) { defer s.wg.Done() if err := chain.Start(lifecycleCtx); err != nil { s.log.Error("error starting chain", "chain_id", chainID.String(), "error", err)