diff --git a/module/state_synchronization/indexer/extended/scheduled_transactions.go b/module/state_synchronization/indexer/extended/scheduled_transactions.go index df3ee8bc082..6b2b12324f9 100644 --- a/module/state_synchronization/indexer/extended/scheduled_transactions.go +++ b/module/state_synchronization/indexer/extended/scheduled_transactions.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/fvm/systemcontracts" "github.com/onflow/flow-go/model/access" + "github.com/onflow/flow-go/model/access/systemcollection" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/state_synchronization/indexer/extended/events" @@ -53,7 +54,9 @@ type ScheduledTransactions struct { store storage.ScheduledTransactionsIndexBootstrapper metrics module.ExtendedIndexingMetrics - scheduledExecutorAddr flow.Address + // executorAddr resolves the executor authorizer address for a given block height. + // v0 system collections use FlowServiceAccount; v1 uses ScheduledTransactionExecutor. + executorAddr *access.Versioned[flow.Address] scheduledEventType flow.EventType pendingExecutionType flow.EventType @@ -113,12 +116,24 @@ func NewScheduledTransactions( scheduler := sc.FlowTransactionScheduler prefix := fmt.Sprintf("A.%s.%s.", scheduler.Address.Hex(), scheduler.Name) + // Build a height-versioned executor address that matches the system collection builder versions. + // v0 uses FlowServiceAccount as the executor authorizer; v1 uses ScheduledTransactionExecutor. + versionMapper, ok := systemcollection.ChainHeightVersions[chainID] + if !ok { + versionMapper = access.NewStaticHeightVersionMapper(access.LatestBoundary) + } + executorAddr := access.NewVersioned(map[access.Version]flow.Address{ + systemcollection.Version0: sc.FlowServiceAccount.Address, + systemcollection.Version1: sc.ScheduledTransactionExecutor.Address, + access.VersionLatest: sc.ScheduledTransactionExecutor.Address, + }, versionMapper) + return &ScheduledTransactions{ - log: log.With().Str("component", "scheduled_tx_indexer").Logger(), - store: store, - metrics: metrics, - requester: NewScheduledTransactionRequester(scriptExecutor, chainID), - scheduledExecutorAddr: sc.ScheduledTransactionExecutor.Address, + log: log.With().Str("component", "scheduled_tx_indexer").Logger(), + store: store, + metrics: metrics, + requester: NewScheduledTransactionRequester(scriptExecutor, chainID), + executorAddr: executorAddr, scheduledEventType: flow.EventType(prefix + "Scheduled"), pendingExecutionType: flow.EventType(prefix + "PendingExecution"), executedEventType: flow.EventType(prefix + "Executed"), @@ -384,13 +399,9 @@ func (s *ScheduledTransactions) collectScheduledTransactionData(data BlockData) // start searching from the system transaction that adds the scheduled transactions into the // system collection to reduce overhead. for _, tx := range data.Transactions[*pendingEventTxIndex:] { - if !s.isExecutorTransaction(tx) { + if !s.isExecutorTransaction(tx, data.Header.Height) { continue } - // the executor transaction must have a scheduled tx ID argument. - if len(tx.Arguments) < 1 { - return nil, fmt.Errorf("executor transaction %s has no scheduled tx ID argument", tx.ID()) - } id, err := decodeScheduledTxIDArg(tx.Arguments[0]) if err != nil { @@ -424,11 +435,14 @@ func (s *ScheduledTransactions) collectScheduledTransactionData(data BlockData) } // isExecutorTransaction returns true if the transaction was submitted by the scheduled executor -// account: sole authorizer is the scheduled executor address and payer is the empty address. -func (s *ScheduledTransactions) isExecutorTransaction(tx *flow.TransactionBody) bool { +// account for the given block height: sole authorizer matches the height-appropriate executor +// address, payer is the empty address, and the transaction has at least one argument (the +// scheduled tx ID). +func (s *ScheduledTransactions) isExecutorTransaction(tx *flow.TransactionBody, height uint64) bool { return tx.Payer == flow.EmptyAddress && len(tx.Authorizers) == 1 && - tx.Authorizers[0] == s.scheduledExecutorAddr + len(tx.Arguments) >= 1 && + tx.Authorizers[0] == s.executorAddr.ByHeight(height) } // decodeScheduledTxIDArg decodes a JSON-CDC encoded UInt64 argument as a scheduled tx ID. diff --git a/module/state_synchronization/indexer/extended/scheduled_transactions_test.go b/module/state_synchronization/indexer/extended/scheduled_transactions_test.go index 98db210a3c4..1c0b5bec5ac 100644 --- a/module/state_synchronization/indexer/extended/scheduled_transactions_test.go +++ b/module/state_synchronization/indexer/extended/scheduled_transactions_test.go @@ -30,7 +30,10 @@ import ( . "github.com/onflow/flow-go/module/state_synchronization/indexer/extended" ) -const scheduledTestHeight = uint64(100) +const ( + scheduledTestHeight = uint64(100) // v0 range on testnet (boundary at 290050888) + scheduledTestHeightV1 = uint64(290050900) // v1 range on testnet +) // TestScheduledTransactionsIndexer_NoEvents verifies that indexing a block with no scheduler // events stores an empty slice and advances the height. @@ -220,9 +223,10 @@ func TestScheduledTransactionsIndexer_FailedTransaction(t *testing.T) { // Height 2: PendingExecution for tx 42, no Executed event. // The executor transaction attempted to execute the scheduled tx but failed. + // scheduledTestHeight falls in the v0 range on testnet, so the executor uses FlowServiceAccount. header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeight+1)) pendingEvt := createPendingExecutionEvent(t, sc, 42, 1, 200, 80, owner, "A.xyz.Contract.Handler") - executorTx := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 42) + executorTx := makeExecutorTransactionBody(t, sc.FlowServiceAccount.Address, 42) indexScheduledBlock(t, indexer, lm, db, BlockData{ Header: header2, Events: []flow.Event{pendingEvt}, @@ -235,6 +239,42 @@ func TestScheduledTransactionsIndexer_FailedTransaction(t *testing.T) { assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID) } +// TestScheduledTransactionsIndexer_FailedTransactionV1 verifies that the failed-tx detection +// path matches executor transactions authorized by ScheduledTransactionExecutor (v1 system +// collection format, used after the version boundary). +func TestScheduledTransactionsIndexer_FailedTransactionV1(t *testing.T) { + t.Parallel() + + sc := systemcontracts.SystemContractsForChain(flow.Testnet) + indexer, store, lm, db := newScheduledTxIndexerForTest(t, flow.Testnet, scheduledTestHeightV1) + + owner := unittest.RandomAddressFixture() + + // Height 1: schedule tx with id=99 + header1 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1)) + scheduledEvt := createScheduledEvent(t, sc, 99, 1, 3000, 200, 80, owner, "A.xyz.Contract.Handler", 15, "") + indexScheduledBlock(t, indexer, lm, db, BlockData{ + Header: header1, + Events: []flow.Event{scheduledEvt}, + }) + + // Height 2: PendingExecution for tx 99, no Executed event. + // v1 uses ScheduledTransactionExecutor.Address as the authorizer. + header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1+1)) + pendingEvt := createPendingExecutionEvent(t, sc, 99, 1, 200, 80, owner, "A.xyz.Contract.Handler") + executorTx := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 99) + indexScheduledBlock(t, indexer, lm, db, BlockData{ + Header: header2, + Events: []flow.Event{pendingEvt}, + Transactions: []*flow.TransactionBody{executorTx}, + }) + + tx, err := store.ByID(99) + require.NoError(t, err) + assert.Equal(t, access.ScheduledTxStatusFailed, tx.Status) + assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID) +} + // TestScheduledTransactionsIndexer_PendingWithoutExecuted verifies that a PendingExecution event // without either a matching Executed event or a corresponding executor transaction returns an error. func TestScheduledTransactionsIndexer_PendingWithoutExecuted(t *testing.T) { @@ -601,7 +641,7 @@ func TestScheduledTransactionsIndexer_MixedFailedAndExecuted(t *testing.T) { pending20 := createPendingExecutionEvent(t, sc, 20, 1, 100, 10, owner, "A.abc.Contract.Handler") pending21 := createPendingExecutionEvent(t, sc, 21, 1, 150, 10, owner, "A.abc.Contract.Handler") executed20 := createExecutedEvent(t, sc, 20, 1, 100, owner, "A.abc.Contract.Handler", 20, "") - executorTx21 := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 21) + executorTx21 := makeExecutorTransactionBody(t, sc.FlowServiceAccount.Address, 21) indexScheduledBlock(t, indexer, lm, db, BlockData{ Header: header2, Events: []flow.Event{pending20, pending21, executed20}, @@ -641,6 +681,88 @@ func TestScheduledTransactionsIndexer_NonExecutorTxSkipped(t *testing.T) { // The non-executor tx has the wrong payer and should be skipped. header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeight+1)) pendingEvt := createPendingExecutionEvent(t, sc, 30, 1, 100, 10, owner, "A.abc.Contract.Handler") + nonExecutorTx := &flow.TransactionBody{ + Payer: unittest.RandomAddressFixture(), // wrong payer + Authorizers: []flow.Address{sc.FlowServiceAccount.Address}, + } + executorTx := makeExecutorTransactionBody(t, sc.FlowServiceAccount.Address, 30) + indexScheduledBlock(t, indexer, lm, db, BlockData{ + Header: header2, + Events: []flow.Event{pendingEvt}, + Transactions: []*flow.TransactionBody{nonExecutorTx, executorTx}, + }) + + tx, err := store.ByID(30) + require.NoError(t, err) + assert.Equal(t, access.ScheduledTxStatusFailed, tx.Status) + assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID) +} + +// TestScheduledTransactionsIndexer_MixedFailedAndExecutedV1 is the v1 counterpart of +// TestScheduledTransactionsIndexer_MixedFailedAndExecuted, using ScheduledTransactionExecutor +// as the executor authorizer. +func TestScheduledTransactionsIndexer_MixedFailedAndExecutedV1(t *testing.T) { + t.Parallel() + + sc := systemcontracts.SystemContractsForChain(flow.Testnet) + indexer, store, lm, db := newScheduledTxIndexerForTest(t, flow.Testnet, scheduledTestHeightV1) + + owner := unittest.RandomAddressFixture() + + // Height 1: schedule txs 20 and 21 + header1 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1)) + evt20 := createScheduledEvent(t, sc, 20, 1, 1000, 100, 10, owner, "A.abc.Contract.Handler", 20, "") + evt21 := createScheduledEvent(t, sc, 21, 1, 1000, 150, 10, owner, "A.abc.Contract.Handler", 21, "") + indexScheduledBlock(t, indexer, lm, db, BlockData{ + Header: header1, + Events: []flow.Event{evt20, evt21}, + }) + + // Height 2: tx 20 succeeds, tx 21 fails (executor tx present, no Executed event) + header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1+1)) + pending20 := createPendingExecutionEvent(t, sc, 20, 1, 100, 10, owner, "A.abc.Contract.Handler") + pending21 := createPendingExecutionEvent(t, sc, 21, 1, 150, 10, owner, "A.abc.Contract.Handler") + executed20 := createExecutedEvent(t, sc, 20, 1, 100, owner, "A.abc.Contract.Handler", 20, "") + executorTx21 := makeExecutorTransactionBody(t, sc.ScheduledTransactionExecutor.Address, 21) + indexScheduledBlock(t, indexer, lm, db, BlockData{ + Header: header2, + Events: []flow.Event{pending20, pending21, executed20}, + Transactions: []*flow.TransactionBody{executorTx21}, + }) + + tx20, err := store.ByID(20) + require.NoError(t, err) + assert.Equal(t, access.ScheduledTxStatusExecuted, tx20.Status) + + tx21, err := store.ByID(21) + require.NoError(t, err) + assert.Equal(t, access.ScheduledTxStatusFailed, tx21.Status) + assert.Equal(t, executorTx21.ID(), tx21.ExecutedTransactionID) +} + +// TestScheduledTransactionsIndexer_NonExecutorTxSkippedV1 is the v1 counterpart of +// TestScheduledTransactionsIndexer_NonExecutorTxSkipped, using ScheduledTransactionExecutor +// as the executor authorizer. +func TestScheduledTransactionsIndexer_NonExecutorTxSkippedV1(t *testing.T) { + t.Parallel() + + sc := systemcontracts.SystemContractsForChain(flow.Testnet) + indexer, store, lm, db := newScheduledTxIndexerForTest(t, flow.Testnet, scheduledTestHeightV1) + + owner := unittest.RandomAddressFixture() + + // Height 1: schedule tx with id=30 + header1 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1)) + scheduledEvt := createScheduledEvent(t, sc, 30, 1, 1000, 100, 10, owner, "A.abc.Contract.Handler", 30, "") + indexScheduledBlock(t, indexer, lm, db, BlockData{ + Header: header1, + Events: []flow.Event{scheduledEvt}, + }) + + // Height 2: PendingExecution for tx 30, a non-executor tx, then the real executor tx. + // The non-executor tx has the wrong payer and should be skipped. + header2 := unittest.BlockHeaderFixtureOnChain(flow.Testnet, unittest.WithHeaderHeight(scheduledTestHeightV1+1)) + pendingEvt := createPendingExecutionEvent(t, sc, 30, 1, 100, 10, owner, "A.abc.Contract.Handler") nonExecutorTx := &flow.TransactionBody{ Payer: unittest.RandomAddressFixture(), // wrong payer Authorizers: []flow.Address{sc.ScheduledTransactionExecutor.Address}, @@ -658,8 +780,10 @@ func TestScheduledTransactionsIndexer_NonExecutorTxSkipped(t *testing.T) { assert.Equal(t, executorTx.ID(), tx.ExecutedTransactionID) } -// TestScheduledTransactionsIndexer_ExecutorTxNoArguments verifies that an executor transaction -// with no arguments returns an error rather than silently skipping the failed tx. +// TestScheduledTransactionsIndexer_ExecutorTxNoArguments verifies that a transaction with the +// correct executor address and payer but no arguments is not treated as an executor transaction. +// This distinguishes executor transactions from other system transactions (e.g. ProcessCallbacksTransaction) +// that share the same authorizer in v0. func TestScheduledTransactionsIndexer_ExecutorTxNoArguments(t *testing.T) { t.Parallel() @@ -669,19 +793,21 @@ func TestScheduledTransactionsIndexer_ExecutorTxNoArguments(t *testing.T) { owner := unittest.RandomAddressFixture() pendingEvt := createPendingExecutionEvent(t, sc, 50, 1, 100, 10, owner, "A.abc.Contract.Handler") - executorTx := &flow.TransactionBody{ + // A system transaction with no arguments should not be matched as an executor tx, + // even though it has the right authorizer and payer. + noArgTx := &flow.TransactionBody{ Payer: flow.EmptyAddress, - Authorizers: []flow.Address{sc.ScheduledTransactionExecutor.Address}, + Authorizers: []flow.Address{sc.FlowServiceAccount.Address}, Arguments: nil, } err := indexScheduledBlockExpectError(t, indexer, lm, db, BlockData{ Header: header, Events: []flow.Event{pendingEvt}, - Transactions: []*flow.TransactionBody{executorTx}, + Transactions: []*flow.TransactionBody{noArgTx}, }) require.Error(t, err) - assert.Contains(t, err.Error(), "has no scheduled tx ID argument") + assert.Contains(t, err.Error(), "have no corresponding executor transaction") } // TestScheduledTransactionsIndexer_ExecutorTxMalformedArg verifies that an executor transaction @@ -701,7 +827,7 @@ func TestScheduledTransactionsIndexer_ExecutorTxMalformedArg(t *testing.T) { require.NoError(t, encErr) executorTx := &flow.TransactionBody{ Payer: flow.EmptyAddress, - Authorizers: []flow.Address{sc.ScheduledTransactionExecutor.Address}, + Authorizers: []flow.Address{sc.FlowServiceAccount.Address}, Arguments: [][]byte{malformedArg}, }