diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index bc436407d65..92bb7b9bb8a 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -580,9 +580,14 @@ func (exeNode *ExecutionNode) LoadProviderEngine( node.RootChainID, exeNode.exeConf.computationConfig.ExtensiveTracing, exeNode.exeConf.scheduleCallbacksEnabled, + exeNode.exeConf.tokenTrackingEnabled, )..., ) + if exeNode.exeConf.tokenTrackingEnabled { + node.Logger.Info().Str("module", "tc-inspector").Msg("token tracking inspector enabled") + } + vmCtx := fvm.NewContext(opts...) var collector module.ExecutionMetrics @@ -603,6 +608,7 @@ func (exeNode *ExecutionNode) LoadProviderEngine( } ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer) + exeNode.exeConf.computationConfig.TokenTrackingEnabled = exeNode.exeConf.tokenTrackingEnabled manager, err := computation.New( node.Logger, collector, diff --git a/cmd/execution_config.go b/cmd/execution_config.go index 308a17d94ef..a9c29f766b3 100644 --- a/cmd/execution_config.go +++ b/cmd/execution_config.go @@ -60,6 +60,7 @@ type ExecutionConfig struct { transactionExecutionMetricsEnabled bool transactionExecutionMetricsBufferSize uint scheduleCallbacksEnabled bool + tokenTrackingEnabled bool computationConfig computation.ComputationConfig receiptRequestWorkers uint // common provider engine workers @@ -146,6 +147,7 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) { flags.BoolVar(&exeConf.enableStorehouse, "enable-storehouse", false, "enable storehouse to store registers on disk, default is false") flags.BoolVar(&exeConf.enableChecker, "enable-checker", true, "enable checker to check the correctness of the execution result, default is true") flags.BoolVar(&exeConf.scheduleCallbacksEnabled, "scheduled-callbacks-enabled", fvm.DefaultScheduledCallbacksEnabled, "enable execution of scheduled callbacks") + flags.BoolVar(&exeConf.tokenTrackingEnabled, "token-tracking-enabled", false, "enable tracking and logging of token moves on transactions") // deprecated. Retain it to prevent nodes that previously had this configuration from crashing. var deprecatedEnableNewIngestionEngine bool flags.BoolVar(&deprecatedEnableNewIngestionEngine, "enable-new-ingestion-engine", true, "enable new ingestion engine, default is true") diff --git a/cmd/util/cmd/inspect-token-movements/cmd.go b/cmd/util/cmd/inspect-token-movements/cmd.go new file mode 100644 index 00000000000..f36c51c2a4c --- /dev/null +++ b/cmd/util/cmd/inspect-token-movements/cmd.go @@ -0,0 +1,227 @@ +package inspect + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/onflow/flow-go/cmd/util/cmd/common" + "github.com/onflow/flow-go/fvm/inspection" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" +) + +var ( + flagDatadir string + flagChunkDataPackDir string + flagChain string + flagFromTo string + flagLastK uint64 +) + +// Cmd is the command for inspecting token movements in executed blocks +// by reading chunk data packs and running the token changes inspector. +// +// # inspect the last 100 sealed blocks +// ./util inspect-token-movements --chain flow-mainnet --datadir /var/flow/data/protocol --chunk_data_pack_dir /var/flow/data/chunk_data_packs --lastk 100 +// # inspect the blocks from height 2000 to 3000 +// ./util inspect-token-movements --chain flow-mainnet --datadir /var/flow/data/protocol --chunk_data_pack_dir /var/flow/data/chunk_data_packs --from_to 2000_3000 +var Cmd = &cobra.Command{ + Use: "inspect-token-movements", + Short: "inspect token movements by analyzing chunk data packs for unaccounted token mints/burns", + Run: run, +} + +func init() { + Cmd.Flags().StringVar(&flagChain, "chain", "", "Chain name") + _ = Cmd.MarkFlagRequired("chain") + + common.InitDataDirFlag(Cmd, &flagDatadir) + + Cmd.Flags().StringVar(&flagChunkDataPackDir, "chunk_data_pack_dir", "/var/flow/data/chunk_data_packs", + "directory that stores the chunk data packs") + _ = Cmd.MarkFlagRequired("chunk_data_pack_dir") + + Cmd.Flags().Uint64Var(&flagLastK, "lastk", 1, + "last k sealed blocks to inspect") + + Cmd.Flags().StringVar(&flagFromTo, "from_to", "", + "the height range to inspect blocks (inclusive), i.e, 1_1000, 1000_2000, 2000_3000, etc.") +} + +func run(*cobra.Command, []string) { + lockManager := storage.MakeSingletonLockManager() + chainID := flow.ChainID(flagChain) + chain := chainID.Chain() + + lg := log.With(). + Str("chain", string(chainID)). + Str("datadir", flagDatadir). + Str("chunk_data_pack_dir", flagChunkDataPackDir). + Uint64("lastk", flagLastK). + Str("from_to", flagFromTo). + Logger() + + lg.Info().Msg("initializing token movements inspector") + + closer, storages, chunkDataPacks, state, err := initStorages(lockManager, flagDatadir, flagChunkDataPackDir) + if err != nil { + lg.Fatal().Err(err).Msg("could not init storages") + } + defer func() { + if closeErr := closer(); closeErr != nil { + lg.Warn().Err(closeErr).Msg("error closing storages") + } + }() + + // Create the token changes inspector with default search tokens for this chain + inspector := inspection.NewTokenChangesInspector(inspection.DefaultTokenDiffSearchTokens(chain)) + + var from, to uint64 + + if flagFromTo != "" { + from, to, err = parseFromTo(flagFromTo) + if err != nil { + lg.Fatal().Err(err).Msg("could not parse from_to") + } + } else { + lastSealed, err := state.Sealed().Head() + if err != nil { + lg.Fatal().Err(err).Msg("could not get last sealed height") + } + + root := state.Params().SealedRoot().Height + + // preventing overflow + if flagLastK > lastSealed.Height+1 { + lg.Fatal().Msgf("k is greater than the number of sealed blocks, k: %d, last sealed height: %d", flagLastK, lastSealed.Height) + } + + from = lastSealed.Height - flagLastK + 1 + + // root block is not verifiable, because it's sealed already. + // the first verifiable is the next block of the root block + firstVerifiable := root + 1 + + if from < firstVerifiable { + from = firstVerifiable + } + to = lastSealed.Height + } + + root := state.Params().SealedRoot().Height + if from <= root { + lg.Fatal().Msgf("cannot inspect blocks before the root block, from: %d, root: %d", from, root) + } + + lg.Info().Msgf("inspecting token movements for blocks from %d to %d", from, to) + + for height := from; height <= to; height++ { + err := inspectHeight( + lg, + chainID, + height, + storages.Headers, + chunkDataPacks, + storages.Results, + state, + inspector, + ) + if err != nil { + lg.Error().Err(err).Uint64("height", height).Msg("error inspecting height") + } + } + + lg.Info().Msgf("finished inspecting token movements for blocks from %d to %d", from, to) +} + +func inspectHeight( + lg zerolog.Logger, + chainID flow.ChainID, + height uint64, + headers storage.Headers, + chunkDataPacks storage.ChunkDataPacks, + results storage.ExecutionResults, + protocolState protocol.State, + inspector *inspection.TokenChanges, +) error { + header, err := headers.ByHeight(height) + if err != nil { + return fmt.Errorf("could not get block header by height %d: %w", height, err) + } + + blockID := header.ID() + + result, err := results.ByBlockID(blockID) + if err != nil { + if errors.Is(err, storage.ErrNotFound) { + lg.Warn().Uint64("height", height).Hex("block_id", blockID[:]).Msg("execution result not found") + return nil + } + return fmt.Errorf("could not get execution result by block ID %s: %w", blockID, err) + } + + heightLg := lg.With(). + Uint64("height", height). + Hex("block_id", blockID[:]). + Logger() + + heightLg.Info().Int("num_chunks", len(result.Chunks)).Msg("inspecting block") + + for _, chunk := range result.Chunks { + chunkDataPack, err := chunkDataPacks.ByChunkID(chunk.ID()) + if err != nil { + return fmt.Errorf("could not get chunk data pack by chunk ID %s: %w", chunk.ID(), err) + } + + chunkLg := heightLg.With(). + Uint64("chunk_index", chunk.Index). + Logger() + + err = inspectChunkFromDataPack( + chunkLg, + chainID, + header, + chunk, + chunkDataPack, + result, + protocolState, + headers, + inspector, + ) + if err != nil { + chunkLg.Error().Err(err).Msg("error inspecting chunk") + } + } + + return nil +} + +func parseFromTo(fromTo string) (from, to uint64, err error) { + parts := strings.Split(fromTo, "_") + if len(parts) != 2 { + return 0, 0, fmt.Errorf("invalid format: expected 'from_to', got '%s'", fromTo) + } + + from, err = strconv.ParseUint(strings.TrimSpace(parts[0]), 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid 'from' value: %w", err) + } + + to, err = strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) + if err != nil { + return 0, 0, fmt.Errorf("invalid 'to' value: %w", err) + } + + if from > to { + return 0, 0, fmt.Errorf("'from' value (%d) must be less than or equal to 'to' value (%d)", from, to) + } + + return from, to, nil +} diff --git a/cmd/util/cmd/inspect-token-movements/storage.go b/cmd/util/cmd/inspect-token-movements/storage.go new file mode 100644 index 00000000000..4087b6d2e99 --- /dev/null +++ b/cmd/util/cmd/inspect-token-movements/storage.go @@ -0,0 +1,294 @@ +package inspect + +import ( + "errors" + "fmt" + + "github.com/jordanschalm/lockctx" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/cmd/util/cmd/common" + "github.com/onflow/flow-go/engine/execution/computation" + executionState "github.com/onflow/flow-go/engine/execution/state" + "github.com/onflow/flow-go/fvm" + "github.com/onflow/flow-go/fvm/blueprints" + "github.com/onflow/flow-go/fvm/initialize" + "github.com/onflow/flow-go/fvm/inspection" + "github.com/onflow/flow-go/fvm/storage/derived" + "github.com/onflow/flow-go/fvm/storage/logical" + "github.com/onflow/flow-go/fvm/storage/snapshot" + "github.com/onflow/flow-go/ledger" + "github.com/onflow/flow-go/ledger/partial" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/verification" + "github.com/onflow/flow-go/model/verification/convert" + "github.com/onflow/flow-go/module/metrics" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation/pebbleimpl" + storagepebble "github.com/onflow/flow-go/storage/pebble" + "github.com/onflow/flow-go/storage/store" +) + +func initStorages( + lockManager lockctx.Manager, + dataDir string, + chunkDataPackDir string, +) ( + func() error, + *store.All, + storage.ChunkDataPacks, + protocol.State, + error, +) { + db, err := common.InitStorage(dataDir) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("could not init storage database: %w", err) + } + + storages := common.InitStorages(db) + state, err := common.OpenProtocolState(lockManager, db, storages) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("could not open protocol state: %w", err) + } + + // require the chunk data pack data must exist before returning the storage module + chunkDataPackDB, err := storagepebble.ShouldOpenDefaultPebbleDB( + log.Logger.With().Str("pebbledb", "cdp").Logger(), chunkDataPackDir) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("could not open chunk data pack DB: %w", err) + } + storedChunkDataPacks := store.NewStoredChunkDataPacks(metrics.NewNoopCollector(), pebbleimpl.ToDB(chunkDataPackDB), 1000) + chunkDataPacks := store.NewChunkDataPacks(metrics.NewNoopCollector(), + db, storedChunkDataPacks, storages.Collections, 1000) + + closer := func() error { + var dbErr, chunkDataPackDBErr error + + if err := db.Close(); err != nil { + dbErr = fmt.Errorf("failed to close protocol db: %w", err) + } + + if err := chunkDataPackDB.Close(); err != nil { + chunkDataPackDBErr = fmt.Errorf("failed to close chunk data pack db: %w", err) + } + return errors.Join(dbErr, chunkDataPackDBErr) + } + + return closer, storages, chunkDataPacks, state, nil +} + +// partialLedgerStorageSnapshot wraps a storage snapshot and tracks unknown register touches +type partialLedgerStorageSnapshot struct { + snapshot snapshot.StorageSnapshot + + unknownRegTouch map[flow.RegisterID]struct{} +} + +func (storage *partialLedgerStorageSnapshot) Get( + id flow.RegisterID, +) ( + flow.RegisterValue, + error, +) { + value, err := storage.snapshot.Get(id) + if err != nil && errors.Is(err, ledger.ErrMissingKeys{}) { + storage.unknownRegTouch[id] = struct{}{} + return flow.RegisterValue{}, nil + } + + return value, err +} + +// chunkInspector handles the inspection of chunks by re-executing transactions +type chunkInspector struct { + vm fvm.VM + vmCtx fvm.Context + systemChunkCtx fvm.Context + logger zerolog.Logger + inspector *inspection.TokenChanges +} + +func newChunkInspector( + logger zerolog.Logger, + chainID flow.ChainID, + headers storage.Headers, + inspector *inspection.TokenChanges, +) *chunkInspector { + vm := fvm.NewVirtualMachine() + fvmOptions := initialize.InitFvmOptions( + chainID, + headers, + false, // transaction fees not disabled + ) + fvmOptions = append( + []fvm.Option{fvm.WithLogger(logger)}, + fvmOptions..., + ) + + fvmOptions = append( + fvmOptions, + computation.DefaultFVMOptions( + chainID, + false, + false, + false, + )..., + ) + fvmOptions = append(fvmOptions, fvm.WithChain(chainID.Chain())) + vmCtx := fvm.NewContext(fvmOptions...) + + return &chunkInspector{ + vm: vm, + vmCtx: vmCtx, + systemChunkCtx: vmCtx, // simplified for inspection + logger: logger, + inspector: inspector, + } +} + +// inspectChunk re-executes transactions in the chunk and runs the token inspector +func (ci *chunkInspector) inspectChunk( + vc *verification.VerifiableChunkData, +) error { + var transactions []*fvm.TransactionProcedure + derivedBlockData := derived.NewEmptyDerivedBlockData(logical.Time(vc.TransactionOffset)) + + ctx := fvm.NewContextFromParent( + ci.vmCtx, + fvm.WithBlockHeader(vc.Header), + fvm.WithProtocolStateSnapshot(vc.Snapshot), + fvm.WithDerivedBlockData(derivedBlockData), + ) + + if vc.IsSystemChunk { + // For system chunks, create the system transaction + txBody, err := blueprints.SystemChunkTransaction(ci.vmCtx.Chain) + if err != nil { + return fmt.Errorf("could not get system chunk transaction: %w", err) + } + transactions = []*fvm.TransactionProcedure{ + fvm.Transaction(txBody, vc.TransactionOffset), + } + } else { + // For regular chunks, use the collection transactions + transactions = make( + []*fvm.TransactionProcedure, + 0, + len(vc.ChunkDataPack.Collection.Transactions)) + for i, txBody := range vc.ChunkDataPack.Collection.Transactions { + tx := fvm.Transaction(txBody, vc.TransactionOffset+uint32(i)) + transactions = append(transactions, tx) + } + } + + // Construct partial trie from chunk data pack + psmt, err := partial.NewLedger( + vc.ChunkDataPack.Proof, + ledger.State(vc.ChunkDataPack.StartState), + partial.DefaultPathFinderVersion, + ) + if err != nil { + return fmt.Errorf("could not construct partial trie: %w", err) + } + + // Create storage snapshot + unknownRegTouch := make(map[flow.RegisterID]struct{}) + snapshotTree := snapshot.NewSnapshotTree( + &partialLedgerStorageSnapshot{ + snapshot: executionState.NewLedgerStorageSnapshot( + psmt, + vc.ChunkDataPack.StartState), + unknownRegTouch: unknownRegTouch, + }) + + // Execute each transaction and inspect + for i, tx := range transactions { + ci.logger.Info(). + Int("tx_index", i). + Hex("tx_id", tx.ID[:]). + Msg("executing transaction") + + executionSnapshot, output, err := ci.vm.Run( + ctx, + tx, + snapshotTree) + if err != nil { + ci.logger.Warn(). + Err(err). + Int("tx_index", i). + Hex("tx_id", tx.ID[:]). + Msg("failed to execute transaction") + continue + } + + // Collect events for inspection + events := make([]flow.Event, 0, len(output.Events)+len(output.ServiceEvents)) + events = append(events, output.Events...) + events = append(events, output.ServiceEvents...) + + // Run the inspector + result, err := ci.inspector.Inspect(ci.logger, snapshotTree, executionSnapshot, events) + if err != nil { + ci.logger.Warn(). + Err(err). + Int("tx_index", i). + Hex("tx_id", tx.ID[:]). + Msg("failed to inspect transaction") + } else { + // Log the inspection result + ci.logInspectionResult(tx.ID, i, result) + } + + // Update snapshot tree for next transaction + snapshotTree = snapshotTree.Append(executionSnapshot) + } + + return nil +} + +func (ci *chunkInspector) logInspectionResult(txID flow.Identifier, txIndex int, result inspection.Result) { + if result == nil { + ci.logger.Info().Msgf("no result from inspection, transaction did not trigger any token movements") + return + } + + lvl, evt := result.AsLogEvent() + if evt == nil { + ci.logger.Info().Msgf("transaction did not trigger any token movements") + return + } + + e := ci.logger.WithLevel(lvl). + Hex("tx_id", txID[:]). + Int("tx_index", txIndex) + evt(e) + e.Msg("Token inspection result") +} + +// inspectChunkFromDataPack creates a VerifiableChunkData and inspects it +func inspectChunkFromDataPack( + logger zerolog.Logger, + chainID flow.ChainID, + header *flow.Header, + chunk *flow.Chunk, + chunkDataPack *flow.ChunkDataPack, + result *flow.ExecutionResult, + protocolState protocol.State, + headers storage.Headers, + inspector *inspection.TokenChanges, +) error { + // Get protocol snapshot at the block + ps := protocolState.AtBlockID(header.ID()) + + // Convert to verifiable chunk data + vcd, err := convert.FromChunkDataPack(chunk, chunkDataPack, header, ps, result) + if err != nil { + return fmt.Errorf("could not convert chunk data pack: %w", err) + } + + // Create chunk inspector and run inspection + chunkInspector := newChunkInspector(logger, chainID, headers, inspector) + return chunkInspector.inspectChunk(vcd) +} diff --git a/cmd/util/cmd/root.go b/cmd/util/cmd/root.go index 2fc197470c8..bb766ae3f9f 100644 --- a/cmd/util/cmd/root.go +++ b/cmd/util/cmd/root.go @@ -33,6 +33,7 @@ import ( find_inconsistent_result "github.com/onflow/flow-go/cmd/util/cmd/find-inconsistent-result" find_trie_root "github.com/onflow/flow-go/cmd/util/cmd/find-trie-root" generate_authorization_fixes "github.com/onflow/flow-go/cmd/util/cmd/generate-authorization-fixes" + inspect_token_movements "github.com/onflow/flow-go/cmd/util/cmd/inspect-token-movements" "github.com/onflow/flow-go/cmd/util/cmd/leaders" pebble_checkpoint "github.com/onflow/flow-go/cmd/util/cmd/pebble-checkpoint" read_badger "github.com/onflow/flow-go/cmd/util/cmd/read-badger/cmd" @@ -136,6 +137,7 @@ func addCommands() { rootCmd.AddCommand(pebble_checkpoint.Cmd) rootCmd.AddCommand(db_migration.Cmd) rootCmd.AddCommand(diffkeys.Cmd) + rootCmd.AddCommand(inspect_token_movements.Cmd) } func initConfig() { diff --git a/cmd/util/cmd/run-script/cmd.go b/cmd/util/cmd/run-script/cmd.go index 31a1f1f2fc4..d864af1a222 100644 --- a/cmd/util/cmd/run-script/cmd.go +++ b/cmd/util/cmd/run-script/cmd.go @@ -134,7 +134,7 @@ func run(*cobra.Command, []string) { registersByAccount.AccountCount(), ) - options := computation.DefaultFVMOptions(chainID, false, false) + options := computation.DefaultFVMOptions(chainID, false, false, false) options = append( options, fvm.WithContractDeploymentRestricted(false), diff --git a/cmd/util/ledger/migrations/transaction_migration.go b/cmd/util/ledger/migrations/transaction_migration.go index 488ce1aeb63..352153b3a56 100644 --- a/cmd/util/ledger/migrations/transaction_migration.go +++ b/cmd/util/ledger/migrations/transaction_migration.go @@ -19,7 +19,7 @@ func NewTransactionBasedMigration( ) RegistersMigration { return func(registersByAccount *registers.ByAccount) error { - options := computation.DefaultFVMOptions(chainID, false, false) + options := computation.DefaultFVMOptions(chainID, false, false, false) options = append(options, fvm.WithContractDeploymentRestricted(false), fvm.WithContractRemovalRestricted(false), diff --git a/cmd/verification_builder.go b/cmd/verification_builder.go index 47dbfd153a8..dd9a3f228b3 100644 --- a/cmd/verification_builder.go +++ b/cmd/verification_builder.go @@ -212,6 +212,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() { node.RootChainID, false, v.verConf.scheduledCallbacksEnabled, + false, )..., ) vmCtx := fvm.NewContext(fvmOptions...) diff --git a/engine/execution/computation/computer/result_collector.go b/engine/execution/computation/computer/result_collector.go index 3e249147394..8f1b82f6f1c 100644 --- a/engine/execution/computation/computer/result_collector.go +++ b/engine/execution/computation/computer/result_collector.go @@ -8,12 +8,14 @@ import ( "github.com/onflow/crypto" "github.com/onflow/crypto/hash" + "github.com/rs/zerolog" otelTrace "go.opentelemetry.io/otel/trace" "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/engine/execution/computation/result" "github.com/onflow/flow-go/engine/execution/storehouse" "github.com/onflow/flow-go/fvm" + "github.com/onflow/flow-go/fvm/inspection" "github.com/onflow/flow-go/fvm/meter" "github.com/onflow/flow-go/fvm/storage/snapshot" "github.com/onflow/flow-go/fvm/storage/state" @@ -252,6 +254,17 @@ func (collector *resultCollector) processTransactionResult( logger.Info().Msg("transaction executed successfully") } + // (leo debugging): We log inspection results here, because if we logged them ih the FVM + // they would get logged on every transaction retry. + // Same for the metrics. + if len(output.InspectionResults) == 0 { + logger.Debug(). + Hex("tx_id", txn.ID[:]). + Msg("no inspection results for transaction") + } + + collector.logInspectionResults(logger, output.InspectionResults) + collector.handleTransactionExecutionMetrics( timeSpent, output, @@ -293,6 +306,48 @@ func (collector *resultCollector) processTransactionResult( collector.currentCollectionState.Finalize()) } +func (collector *resultCollector) logInspectionResults( + log zerolog.Logger, + results []inspection.Result, +) { + if len(results) == 0 { + return + } + + logEvents := make([]func(e *zerolog.Event), 0, len(results)) + + // The log level will be decided by the inspectionResults + // logLevel := zerolog.TraceLevel + // leo: debugging with info level log + logLevel := zerolog.InfoLevel + for i, inspectionResult := range results { + if inspectionResult == nil { + log.Warn(). + Int("index", i). + Msg("inspection result is nil, likely due to a panic or error during inspection") + continue + } + lvl, evt := inspectionResult.AsLogEvent() + if lvl > logLevel { + logLevel = lvl + } + if evt != nil { + logEvents = append(logEvents, evt) + } + } + + // if there are no loggable inspection results, don't log at all + if len(logEvents) == 0 { + return + } + + evt := log.WithLevel(logLevel).Str("module", "tc-inspector") + for _, logEvent := range logEvents { + logEvent(evt) + } + evt.Msg("Inspection results") +} + func (collector *resultCollector) handleTransactionExecutionMetrics( timeSpent time.Duration, output fvm.ProcedureOutput, diff --git a/engine/execution/computation/computer/transaction_coordinator.go b/engine/execution/computation/computer/transaction_coordinator.go index 6ce2cb3757c..9047e1d8e9d 100644 --- a/engine/execution/computation/computer/transaction_coordinator.go +++ b/engine/execution/computation/computer/transaction_coordinator.go @@ -34,8 +34,9 @@ type transactionCoordinator struct { // Note: database commit and result logging must occur within the same // critical section (guraded by mutex). - database *storage.BlockDatabase - writeBehindLog TransactionWriteBehindLogger + database *storage.BlockDatabase + writeBehindLog TransactionWriteBehindLogger + storageSnapshot snapshot.StorageSnapshot // used for inspection } type transaction struct { @@ -64,13 +65,14 @@ func newTransactionCoordinator( cachedDerivedBlockData) return &transactionCoordinator{ - vm: vm, - mutex: mutex, - cond: cond, - snapshotTime: 0, - abortErr: nil, - database: database, - writeBehindLog: writeBehindLog, + vm: vm, + mutex: mutex, + cond: cond, + snapshotTime: 0, + abortErr: nil, + database: database, + writeBehindLog: writeBehindLog, + storageSnapshot: storageSnapshot, } } @@ -147,10 +149,23 @@ func (coordinator *transactionCoordinator) commit(txn *transaction) error { return err } + output := txn.Output() + + // Run inspection on the transaction results. + // This is done here rather than in vm.Run() because the block computer + // uses NewExecutor for fine-grained transaction control. + output.InspectionResults = coordinator.vm.Inspect( + txn.request.ctx, + txn.request.TransactionProcedure, + coordinator.storageSnapshot, + executionSnapshot, + output, + ) + coordinator.writeBehindLog.AddTransactionResult( txn.request, executionSnapshot, - txn.Output(), + output, time.Since(txn.startedAt), txn.numConflictRetries) diff --git a/engine/execution/computation/manager.go b/engine/execution/computation/manager.go index 9ff4be34890..acdc8957875 100644 --- a/engine/execution/computation/manager.go +++ b/engine/execution/computation/manager.go @@ -7,6 +7,8 @@ import ( "github.com/onflow/cadence/runtime" "github.com/rs/zerolog" + "github.com/onflow/flow-go/fvm/inspection" + "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/engine/execution/computation/computer" "github.com/onflow/flow-go/engine/execution/computation/query" @@ -66,6 +68,9 @@ type ComputationConfig struct { DerivedDataCacheSize uint MaxConcurrency int + // TokenTrackingEnabled enables tracking and logging of token movements on transactions. + TokenTrackingEnabled bool + // When NewCustomVirtualMachine is nil, the manager will create a standard // fvm virtual machine via fvm.NewVirtualMachine. Otherwise, the manager // will create a virtual machine using this function. @@ -106,7 +111,7 @@ func New( } chainID := vmCtx.Chain.ChainID() - options := DefaultFVMOptions(chainID, params.ExtensiveTracing, vmCtx.ScheduleCallbacksEnabled) + options := DefaultFVMOptions(chainID, params.ExtensiveTracing, vmCtx.ScheduleCallbacksEnabled, params.TokenTrackingEnabled) vmCtx = fvm.NewContextFromParent(vmCtx, options...) blockComputer, err := computer.NewBlockComputer( @@ -223,7 +228,7 @@ func (e *Manager) QueryExecutor() query.Executor { return e.queryExecutor } -func DefaultFVMOptions(chainID flow.ChainID, extensiveTracing bool, scheduleCallbacksEnabled bool) []fvm.Option { +func DefaultFVMOptions(chainID flow.ChainID, extensiveTracing, scheduleCallbacksEnabled, tokenTracking bool) []fvm.Option { options := []fvm.Option{ fvm.WithChain(chainID.Chain()), fvm.WithReusableCadenceRuntimePool( @@ -240,5 +245,11 @@ func DefaultFVMOptions(chainID flow.ChainID, extensiveTracing bool, scheduleCall options = append(options, fvm.WithExtensiveTracing()) } + if tokenTracking { + options = append(options, fvm.WithInspectors([]inspection.Inspector{ + inspection.NewTokenChangesInspector(inspection.DefaultTokenDiffSearchTokens(chainID.Chain())), + })) + } + return options } diff --git a/engine/execution/computation/manager_test.go b/engine/execution/computation/manager_test.go index c5a5f885b8d..39eaf64e9dc 100644 --- a/engine/execution/computation/manager_test.go +++ b/engine/execution/computation/manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/onflow/flow-go/engine/execution/testutil" "github.com/onflow/flow-go/fvm" "github.com/onflow/flow-go/fvm/environment" + "github.com/onflow/flow-go/fvm/inspection" fvmErrors "github.com/onflow/flow-go/fvm/errors" "github.com/onflow/flow-go/fvm/storage" "github.com/onflow/flow-go/fvm/storage/derived" @@ -573,6 +574,16 @@ func (p *PanickingVM) GetAccount( panic("not expected") } +func (p *PanickingVM) Inspect( + ctx fvm.Context, + proc fvm.Procedure, + storageSnapshot snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + output fvm.ProcedureOutput, +) []inspection.Result { + return nil +} + type LongRunningExecutor struct { duration time.Duration } @@ -637,6 +648,16 @@ func (l *LongRunningVM) GetAccount( panic("not expected") } +func (l *LongRunningVM) Inspect( + ctx fvm.Context, + proc fvm.Procedure, + storageSnapshot snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + output fvm.ProcedureOutput, +) []inspection.Result { + return nil +} + type FakeBlockComputer struct { computationResult *execution.ComputationResult } diff --git a/engine/verification/verifier/verifiers.go b/engine/verification/verifier/verifiers.go index c15c5c382d2..1641d015af4 100644 --- a/engine/verification/verifier/verifiers.go +++ b/engine/verification/verifier/verifiers.go @@ -351,6 +351,7 @@ func makeVerifier( chainID, false, scheduledCallbacksEnabled, + false, )..., ) vmCtx := fvm.NewContext(fvmOptions...) diff --git a/fvm/context.go b/fvm/context.go index 4c5aff84278..093119eea31 100644 --- a/fvm/context.go +++ b/fvm/context.go @@ -6,6 +6,8 @@ import ( "github.com/rs/zerolog" otelTrace "go.opentelemetry.io/otel/trace" + "github.com/onflow/flow-go/fvm/inspection" + "github.com/onflow/flow-go/fvm/environment" reusableRuntime "github.com/onflow/flow-go/fvm/runtime" "github.com/onflow/flow-go/fvm/storage/derived" @@ -52,6 +54,8 @@ type Context struct { // AllowProgramCacheWritesInScripts determines if the program cache can be written to in scripts // By default, the program cache is only updated by transactions. AllowProgramCacheWritesInScripts bool + + Inspectors []inspection.Inspector } // NewContext initializes a new execution context with the provided options. @@ -416,3 +420,10 @@ func WithScheduleCallbacksEnabled(enabled bool) Option { return ctx } } + +func WithInspectors(inspectors []inspection.Inspector) Option { + return func(ctx Context) Context { + ctx.Inspectors = inspectors + return ctx + } +} diff --git a/fvm/fvm.go b/fvm/fvm.go index ae92d08d945..b65c344f12f 100644 --- a/fvm/fvm.go +++ b/fvm/fvm.go @@ -6,6 +6,10 @@ import ( "github.com/onflow/cadence" "github.com/onflow/cadence/common" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/fvm/inspection" "github.com/onflow/flow-go/fvm/environment" "github.com/onflow/flow-go/fvm/errors" @@ -35,6 +39,7 @@ type ProcedureOutput struct { ComputationIntensities meter.MeteredComputationIntensities MemoryEstimate uint64 Err errors.CodedError + InspectionResults []inspection.Result // Output only by script. Value cadence.Value @@ -119,6 +124,17 @@ type VM interface { ProcedureOutput, error, ) + + // Inspect runs configured inspectors on the procedure results. + // This is used by the block computer which manages transaction execution + // manually via NewExecutor rather than using Run. + Inspect( + ctx Context, + proc Procedure, + storageSnapshot snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + output ProcedureOutput, + ) []inspection.Result } var _ VM = (*VirtualMachine)(nil) @@ -198,7 +214,62 @@ func (vm *VirtualMachine) Run( return nil, ProcedureOutput{}, err } - return executionSnapshot, executor.Output(), nil + // This is of informative nature right now so this placement is ok + // In the future we will need to move this inside the procedure if we want it to affect execution + output := executor.Output() + log.Debug().Str("module", "tc-inspector"). + Str("procedure-type", string(proc.Type())). + Int("inspectors", len(ctx.Inspectors)). + Msg("populating environment values for procedure output") + inspectionResults := vm.inspectProcedureResults(ctx.Logger, ctx, proc, storageSnapshot, executionSnapshot, output) + output.InspectionResults = inspectionResults + + return executionSnapshot, output, nil +} + +// Inspect runs configured inspectors on the procedure results. +// This is used by the block computer which manages transaction execution +// manually via NewExecutor rather than using Run. +func (vm *VirtualMachine) Inspect( + ctx Context, + proc Procedure, + storageSnapshot snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + output ProcedureOutput, +) []inspection.Result { + return vm.inspectProcedureResults(ctx.Logger, ctx, proc, storageSnapshot, executionSnapshot, output) +} + +func (vm *VirtualMachine) inspectProcedureResults( + logger zerolog.Logger, + context Context, + proc Procedure, + storageSnapshot snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + output ProcedureOutput, +) []inspection.Result { + // TODO: this should be decided by the inspector + if proc.Type() != TransactionProcedureType { + logger.Info().Str("module", "tc-inspector").Msg("skipping inspection for non-transaction procedure") + return nil + } + + // TODO: imspector should be able to receive ProcedureOutput directly + evts := make([]flow.Event, 0, len(output.Events)+len(output.ServiceEvents)) + evts = append(evts, output.Events...) + evts = append(evts, output.ServiceEvents...) + + inspectionResults := make([]inspection.Result, len(context.Inspectors)) + logger.Debug().Str("module", "tc-inspector").Int("num_inspectors", len(context.Inspectors)).Msg("inspecting procedure results") + var err error + for i, inspector := range context.Inspectors { + inspectionResults[i], err = inspector.Inspect(logger, storageSnapshot, executionSnapshot, evts) + if err != nil { + logger.Warn().Str("module", "tc-inspector").Err(err).Msg("failed to inspect procedure results") + } + } + + return inspectionResults } // GetAccount returns an account by address or an error if none exists. diff --git a/fvm/inspection/inspector.go b/fvm/inspection/inspector.go new file mode 100644 index 00000000000..38dfaa80439 --- /dev/null +++ b/fvm/inspection/inspector.go @@ -0,0 +1,29 @@ +package inspection + +import ( + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/fvm/storage/snapshot" + "github.com/onflow/flow-go/model/flow" +) + +// Inspector is run after each procedure on the procedure output and the starting state of a procedure +// It will then fill out the ProcedureOutput.Inspection results +type Inspector interface { + // Inspect + // - storage is the execution state before the procedure was executed. + // only the executionSnapshot.Reads, will be read + // - executionSnapshot is the reads and writes of the procedure + // - events are all of the events the procedure is emitting + Inspect( + logger zerolog.Logger, + storage snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + events []flow.Event, + ) (Result, error) +} + +// Result is the result of a procedure inspector +type Result interface { + AsLogEvent() (zerolog.Level, func(e *zerolog.Event)) +} diff --git a/fvm/inspection/token_changes.go b/fvm/inspection/token_changes.go new file mode 100644 index 00000000000..3159ce74a99 --- /dev/null +++ b/fvm/inspection/token_changes.go @@ -0,0 +1,741 @@ +package inspection + +import ( + "fmt" + "math" + "runtime/debug" + "sync" + + "github.com/onflow/atree" + "github.com/onflow/cadence" + "github.com/onflow/cadence/bbq/vm" + "github.com/onflow/cadence/common" + "github.com/onflow/cadence/encoding/ccf" + "github.com/onflow/cadence/interpreter" + "github.com/onflow/cadence/runtime" + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/fvm/systemcontracts" + + "github.com/onflow/flow-go/fvm/storage/snapshot" + "github.com/onflow/flow-go/model/flow" +) + +type TokenChanges struct { + // searchedTokens holds a reference to the map of tokens to search + // its shallow copied from whatever is specified from the outside, so that the locking + // should work properly + searchedTokens TokenChangesSearchTokens + searchedTokensMu sync.RWMutex +} + +var _ Inspector = (*TokenChanges)(nil) + +// NewTokenChangesInspector return a TokenChanges inspector, that will be run +// after transaction execution and analyze if any unaccounted tokens were created or +// destroy. +func NewTokenChangesInspector(searchedTokens TokenChangesSearchTokens) *TokenChanges { + return &TokenChanges{searchedTokens: searchedTokens} +} + +// SetSearchedTokens are safe to replace whenever. +// The change will not affect the inspections already in progress. +// TODO: this can be tied into the admin commands +func (td *TokenChanges) SetSearchedTokens(searchedTokens TokenChangesSearchTokens) { + // copy the map in case the user tries to modify the map + st := make(map[string]SearchToken, len(searchedTokens)) + for k, v := range searchedTokens { + st[k] = v + } + td.searchedTokensMu.Lock() + defer td.searchedTokensMu.Unlock() + td.searchedTokens = st +} + +func (td *TokenChanges) getSearchedTokensRef() TokenChangesSearchTokens { + td.searchedTokensMu.RLock() + defer td.searchedTokensMu.RUnlock() + return td.searchedTokens +} + +// Inspect gets the token diff from a state diff +// - thread safe +// - not deterministic (iterates over maps)! So it should not be used to affect execution! +// - will not panic +// - might return an error, but it is safe to ignore since this for information/reporting +// +// Inspect could technically be run on chunk data packs. +func (td *TokenChanges) Inspect( + logger zerolog.Logger, + storage snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + events []flow.Event, +) (diff Result, err error) { + logger.Info().Str("module", "tc-inspector"). + Int("events", len(events)). + Int("read_set", len(executionSnapshot.ReadSet)). + Int("write_set", len(executionSnapshot.WriteSet)). + Msg("starting token inspection for transaction") + + defer func() { + if r := recover(); r != nil { + logger.Warn().Str("module", "tc-inspector").Msgf("trace=%s", string(debug.Stack())) + err = fmt.Errorf("panic: %v", r) + } + + if err != nil { + err = fmt.Errorf("failed to get token diff: %w", err) + } + }() + + diff, err = td.getTokenDiff(logger, storage, executionSnapshot, events, td.getSearchedTokensRef()) + return +} + +func (td *TokenChanges) getTokenDiff( + logger zerolog.Logger, + storage snapshot.StorageSnapshot, + executionSnapshot *snapshot.ExecutionSnapshot, + events []flow.Event, + searchedTokens map[string]SearchToken, +) (TokenDiffResult, error) { + executionSnapshotLedgers := executionSnapshotLedgers{ + StorageSnapshot: storage, + ExecutionSnapshot: executionSnapshot, + } + + // get all distinct addresses + addresses := make(map[common.Address]struct{}) + allTouched := executionSnapshotLedgers.allTouchedRegisters() + for k := range allTouched { + // skip special registers + // none of them can hold resources + if len(k.Owner) == 0 { + continue + } + addresses[common.Address([]byte(k.Owner))] = struct{}{} + } + + logger.Info().Str("module", "tc-inspector"). + Int("touched_registers", len(allTouched)). + Int("addresses_to_inspect", len(addresses)). + Int("searched_tokens", len(searchedTokens)). + Msg("inspecting token movements") + + if len(addresses) == 0 { + logger.Info().Str("module", "tc-inspector").Msg("no addresses touched, skipping token inspection") + return TokenDiffResult{ + Changes: make(map[flow.Address]AccountChange), + KnownSourcesSinks: make(map[string]int64), + }, nil + } + + oldRegistersLedger := executionSnapshotLedgers.OldValuesLedger() + newValuesRegister := executionSnapshotLedgers.NewValuesLedger() + + // TODO: possible optimisation: run both at the same time + before, err := td.getTokens(logger, oldRegistersLedger, addresses, tokenDiffSearchDomains, searchedTokens) + if err != nil { + return TokenDiffResult{}, fmt.Errorf("failed to get tokens before: %w", err) + } + after, err := td.getTokens(logger, newValuesRegister, addresses, tokenDiffSearchDomains, searchedTokens) + if err != nil { + return TokenDiffResult{}, fmt.Errorf("failed to get tokens after: %w", err) + } + + typicalDiffSize := 4 // from, to, payer and fees + tokenDiffResult := TokenDiffResult{ + Changes: make(map[flow.Address]AccountChange, typicalDiffSize), + } + + for a := range addresses { + // Copy beforeTokens before calling diffAccountTokens, which mutates the before map + beforeTokens := make(accountTokens, len(before[a])) + for k, v := range before[a] { + beforeTokens[k] = v + } + afterTokens := after[a] + diff := diffAccountTokens(before[a], after[a]) + if len(diff) == 0 { + // Only log if the account had tokens before or after + if len(beforeTokens) > 0 || len(afterTokens) > 0 { + logger.Info().Str("module", "tc-inspector"). + Str("account", a.String()). + Interface("before", beforeTokens). + Interface("after", afterTokens). + Msg("account token balance unchanged") + } + continue + } + logger.Info().Str("module", "tc-inspector"). + Str("account", a.String()). + Interface("before", beforeTokens). + Interface("after", afterTokens). + Interface("diff", diff). + Msg("account token balance changed") + tokenDiffResult.Changes[flow.Address(a)] = diff + } + + sourcesSinks, err := td.findSourcesSinks(events, searchedTokens) + if err != nil { + return TokenDiffResult{}, fmt.Errorf("failed to find sources/sinks: %w", err) + } + tokenDiffResult.KnownSourcesSinks = sourcesSinks + + // Log summary of token movements + unaccounted := tokenDiffResult.UnaccountedTokens() + if len(unaccounted) > 0 { + logger.Warn().Str("module", "tc-inspector"). + Int("accounts_changed", len(tokenDiffResult.Changes)). + Interface("sources_sinks", sourcesSinks). + Interface("unaccounted", unaccounted). + Msg("token inspection complete - unaccounted token movements detected") + } else if len(tokenDiffResult.Changes) > 0 { + logger.Info().Str("module", "tc-inspector"). + Int("accounts_changed", len(tokenDiffResult.Changes)). + Interface("sources_sinks", sourcesSinks). + Msg("token inspection complete - all movements accounted for") + } + + return tokenDiffResult, nil +} + +func (td *TokenChanges) getTokens( + logger zerolog.Logger, + storage ledgerSnapshot, + addresses map[common.Address]struct{}, + domains []common.StorageDomain, + searchedTokens map[string]SearchToken, +) (map[common.Address]accountTokens, error) { + storageConfig := runtime.StorageConfig{} + runtimeStorage := runtime.NewStorage(storage, nil, nil, storageConfig) + + // without this the tokens are not properly detected! + // TODO: choose a good number for the workers + err := loadAtreeSlabsInStorage(runtimeStorage, storage, 1) + if err != nil { + return nil, fmt.Errorf("failed to load atree slabs: %w", err) + } + + storageRuntime, err := newReadonlyStorageRuntimeWithStorage(runtimeStorage, runtimeStorage.Count()) + if err != nil { + return nil, fmt.Errorf("failed to create storage runtime: %w", err) + } + + tokens := make(map[common.Address]accountTokens, len(addresses)) + for a := range addresses { + tkns := make(accountTokens, len(searchedTokens)) + for _, d := range domains { + // We are making the assumption that if a register was changed, the registers read to make that change + // are enough to read that register before the change (if it existed) + + // It seems that GetDomainStorageMap() tries to load domain storage map if it isn't loaded. + // This causes a "slab not found" panic because we only included loaded registers in the underlying storage. + // This workaround is to catch the panic gracefully so we can continue to inspect the next domain storage map. + storageMap := getDomainStorageMap(logger, storageRuntime, storageRuntime.Interpreter, a, d) + if storageMap == nil { + continue + } + + iter := storageMap.ReadOnlyLoadedValueIterator() + for { + interpreterValue := iter.NextValue(nil) + + if interpreterValue == nil { + break + } + + walkLoaded(interpreterValue, searchedTokens, tkns) + } + } + if len(tkns) > 0 { + logger.Info().Str("module", "tc-inspector"). + Str("account", a.String()). + Interface("tokens", tkns). + Msg("found tokens in account") + } + tokens[a] = tkns + } + return tokens, nil +} + +func getDomainStorageMap( + logger zerolog.Logger, + storageRuntime *readonlyStorageRuntime, + storageMutationTracker interpreter.StorageMutationTracker, + address common.Address, + domain common.StorageDomain, +) (dm *interpreter.DomainStorageMap) { + defer func() { + if r := recover(); r != nil { + logger.Warn().Str("module", "tc-inspector").Msgf("failed to get domain storage map %s.%s: %v", address.String(), domain.Identifier(), r) + dm = nil + } + }() + return storageRuntime.Storage.GetDomainStorageMap(storageMutationTracker, address, domain, false) +} + +func walkLoaded( + value interpreter.Value, + searchedTokens map[string]SearchToken, + tkns accountTokens, +) { + // The context is not needed for the walk, + // but a context of nil produces an error. + c := &vm.Context{ + Config: &vm.Config{}, + } + + var f func(value interpreter.Value) + f = func(value interpreter.Value) { + switch v := value.(type) { + case *interpreter.CompositeValue: + t, ok := searchedTokens[string(v.TypeID())] + if ok { + tkns.add(t.ID, t.GetBalance(v)) + } + + // technically nothing is stopping you from putting a vault into a vault, so we have to continue walking + v.ForEachReadOnlyLoadedField(c, func(fieldName string, fieldValue interpreter.Value) (resume bool) { + f(fieldValue) + return true + }) + case *interpreter.DictionaryValue: + v.IterateReadOnlyLoaded(c, func(key interpreter.Value, value interpreter.Value) (resume bool) { + f(key) + f(value) + return true + }) + case *interpreter.ArrayValue: + v.IterateReadOnlyLoaded(c, func(value interpreter.Value) (resume bool) { + f(value) + return true + }) + case interpreter.PathLinkValue, interpreter.AccountLinkValue: + // Link values are deprecated legacy types that don't contain tokens. + // PathLinkValue.Walk and AccountLinkValue.Walk panic with "unreachable", + // so we skip them. + return + default: + // This assumes all other types cannot be partially loaded. + v.Walk(c, f) + } + } + f(value) +} + +func (td *TokenChanges) findSourcesSinks(events []flow.Event, tokens map[string]SearchToken) (map[string]int64, error) { + // create a map of all sinks and sources + // TODO: could be created once + type tokenSourceSink struct { + tokenID string + f func(flow.Event) (int64, error) + } + sourcesSinks := make(map[string]tokenSourceSink) + results := make(map[string]int64) + for _, token := range tokens { + for evt, ss := range token.SinksSources { + sourcesSinks[evt] = tokenSourceSink{tokenID: token.ID, f: ss} + } + } + + for _, evt := range events { + id := string(evt.Type) + if ss, ok := sourcesSinks[id]; ok { + v, err := ss.f(evt) + if err != nil { + return nil, fmt.Errorf("failed to parse source/sink event %s: %w", id, err) + } + results[ss.tokenID] += v + } + } + + return results, nil +} + +func newReadonlyStorageRuntimeWithStorage(storage *runtime.Storage, payloadCount int) (*readonlyStorageRuntime, error) { + inter, err := interpreter.NewInterpreter( + nil, + nil, + &interpreter.Config{ + Storage: storage, + }, + ) + if err != nil { + return nil, err + } + + return &readonlyStorageRuntime{ + Interpreter: inter, + Storage: storage, + PayloadCount: payloadCount, + }, nil +} + +type executionSnapshotLedgers struct { + snapshot.StorageSnapshot + *snapshot.ExecutionSnapshot +} + +func (l executionSnapshotLedgers) SetValue(owner, key, value []byte) (err error) { + panic("unexpected call of SetValue.") +} + +func (l executionSnapshotLedgers) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) { + panic("unexpected call of AllocateSlabIndex") +} + +type executionSnapshotLedgersOld struct { + executionSnapshotLedgers +} + +func (o executionSnapshotLedgersOld) Get(owner string, key string) ([]byte, error) { + return o.GetValue([]byte(owner), []byte(key)) +} + +func (o executionSnapshotLedgersOld) Set(owner string, key string, value []byte) error { + return o.SetValue([]byte(owner), []byte(key), value) +} + +func (o executionSnapshotLedgersOld) ForEach(f forEachCallback) error { + for key := range o.ExecutionSnapshot.ReadSet { + id := flow.NewRegisterID(flow.BytesToAddress([]byte(key.Owner)), key.Key) + + v, err := o.StorageSnapshot.Get(id) + if err != nil { + return err + } + + err = f(key.Owner, key.Key, v) + if err != nil { + return err + } + } + return nil +} + +func (o executionSnapshotLedgersOld) Count() int { + return len(o.ExecutionSnapshot.ReadSet) +} + +func (n executionSnapshotLedgersNew) Get(owner string, key string) ([]byte, error) { + return n.GetValue([]byte(owner), []byte(key)) +} + +func (n executionSnapshotLedgersNew) Set(owner string, key string, value []byte) error { + return n.SetValue([]byte(owner), []byte(key), value) +} + +func (n executionSnapshotLedgersNew) ForEach(f forEachCallback) error { + for key := range n.allTouchedRegisters() { + v, err := n.GetValue([]byte(key.Owner), []byte(key.Key)) + if err != nil { + return err + } + + err = f(key.Owner, key.Key, v) + if err != nil { + return err + } + } + return nil +} + +func (n executionSnapshotLedgersNew) Count() int { + return len(n.allTouchedRegisters()) +} + +// allTouchedRegisters returns both read and written to registers. +func (l executionSnapshotLedgers) allTouchedRegisters() map[flow.RegisterID]struct{} { + fullSet := make(map[flow.RegisterID]struct{}) + for key := range l.ExecutionSnapshot.ReadSet { + fullSet[flow.NewRegisterID(flow.BytesToAddress([]byte(key.Owner)), key.Key)] = struct{}{} + } + for key := range l.ExecutionSnapshot.WriteSet { + fullSet[flow.NewRegisterID(flow.BytesToAddress([]byte(key.Owner)), key.Key)] = struct{}{} + } + return fullSet +} + +type ledgerSnapshot interface { + atree.Ledger + registers +} + +type executionSnapshotLedgersNew struct { + executionSnapshotLedgers +} + +func (l executionSnapshotLedgers) OldValuesLedger() ledgerSnapshot { + return executionSnapshotLedgersOld{l} +} + +func (l executionSnapshotLedgers) NewValuesLedger() ledgerSnapshot { + return executionSnapshotLedgersNew{l} +} + +var _ registers = &executionSnapshotLedgersOld{} + +func (o executionSnapshotLedgersOld) GetValue(owner, key []byte) (value []byte, err error) { + id := flow.NewRegisterID(flow.BytesToAddress(owner), string(key)) + _, ok := o.ExecutionSnapshot.ReadSet[id] + if !ok { + return nil, nil + } + + v, err := o.StorageSnapshot.Get(id) + return v, err +} + +func (o executionSnapshotLedgersOld) ValueExists(owner, key []byte) (exists bool, err error) { + v, err := o.GetValue(owner, key) + return len(v) > 0, err +} + +func (n executionSnapshotLedgersNew) GetValue(owner, key []byte) (value []byte, err error) { + id := flow.NewRegisterID(flow.BytesToAddress(owner), string(key)) + + v, ok := n.ExecutionSnapshot.WriteSet[id] + + if ok { + return v, nil + } + + _, ok = n.ExecutionSnapshot.ReadSet[id] + if !ok { + return nil, nil + } + + v, err = n.StorageSnapshot.Get(id) + return v, err +} + +func (n executionSnapshotLedgersNew) ValueExists(owner, key []byte) (exists bool, err error) { + v, err := n.GetValue(owner, key) + return len(v) > 0, err +} + +type readonlyStorageRuntime struct { + Interpreter *interpreter.Interpreter + Storage *runtime.Storage + PayloadCount int +} + +type SearchToken struct { + ID string + GetBalance func(value *interpreter.CompositeValue) uint64 + // TODO: optimize by using decoded events + SinksSources map[string]func(flow.Event) (int64, error) +} + +// TokenDiffResult is the result of the inspection +type TokenDiffResult struct { + // Changes in token balances per account + // parsed from the state changes + Changes map[flow.Address]AccountChange + + // KnownSourcesSinks is a map (by token id) of + // know mints/burns for the token parsed from predetermined events + KnownSourcesSinks map[string]int64 +} + +var _ Result = TokenDiffResult{} + +func (r TokenDiffResult) AsLogEvent() (zerolog.Level, func(e *zerolog.Event)) { + sum := r.UnaccountedTokens() + if len(sum) == 0 { + // everything is ok: log no issues with debug logging + return zerolog.DebugLevel, func(e *zerolog.Event) { e.Str("token_diff", "no issues") } + } + + anyPositive := false + for _, v := range sum { + if v > 0 { + anyPositive = true + break + } + } + + level := zerolog.WarnLevel + if anyPositive { + // if any tracked token increase in supply + // log at error level + level = zerolog.ErrorLevel + } + + return level, func(e *zerolog.Event) { + dict := zerolog.Dict() + for k, v := range sum { + dict = dict.Int64(k, v) + } + e.Dict("token_diff", dict) + } +} + +func (r TokenDiffResult) UnaccountedTokens() map[string]int64 { + sum := make(map[string]int64) + for _, change := range r.Changes { + for token, amount := range change { + sum[token] += amount + } + } + + for k, v := range r.KnownSourcesSinks { + // Yes this should be -. + // If we mint 100 tokens, that will be a source of +100 and the sum will contain + // the +100 we minted. We need to account for that +100 in the sum by **subtracting** + // the +100 we expected from the mint event. + sum[k] -= v + } + + // delete empty entries + for k, v := range sum { + if v == 0 { + delete(sum, k) + } + } + return sum +} + +type AccountChange map[string]int64 + +type accountTokens map[string]uint64 + +func (t accountTokens) add(id string, value uint64) { + t[id] += value +} + +// diffAccountTokens will potentially modify before and after, so they should not be reused +// after this call. +func diffAccountTokens(before accountTokens, after accountTokens) AccountChange { + change := make(AccountChange) + + for k, a := range after { + if b, ok := before[k]; ok { + if a > b { + change[k] = safeUint64ToInt64(a - b) + } else if b > a { + change[k] = -safeUint64ToInt64(b - a) + } + delete(before, k) + } else { + change[k] = safeUint64ToInt64(a) + } + } + + for k, v := range before { + change[k] = -safeUint64ToInt64(v) + } + + return change +} + +// safeUint64ToInt64 converts a uint64 to int64, capping at math.MaxInt64 if the value +// would overflow. If a value is capped, it will cause a mismatch in the token accounting +// which will be logged and raise attention anyway. +func safeUint64ToInt64(v uint64) int64 { + if v > math.MaxInt64 { + return math.MaxInt64 + } + return int64(v) +} + +type forEachCallback func(owner string, key string, value []byte) error + +type registers interface { + Get(owner string, key string) ([]byte, error) + Set(owner string, key string, value []byte) error + ForEach(f forEachCallback) error + Count() int +} + +func loadAtreeSlabsInStorage( + storage *runtime.Storage, + registers registers, + nWorkers int, +) error { + + storageIDs, err := getSlabIDsFromRegisters(registers) + if err != nil { + return err + } + + return storage.PersistentSlabStorage.BatchPreload(storageIDs, nWorkers) +} + +func getSlabIDsFromRegisters(registers registers) ([]atree.SlabID, error) { + storageIDs := make([]atree.SlabID, 0, registers.Count()) + + err := registers.ForEach(func(owner string, key string, _ []byte) error { + if !flow.IsSlabIndexKey(key) { + return nil + } + + slabID := atree.NewSlabID( + atree.Address([]byte(owner)), + atree.SlabIndex([]byte(key[1:])), + ) + + storageIDs = append(storageIDs, slabID) + + return nil + }) + if err != nil { + return nil, err + } + + return storageIDs, nil +} + +var tokenDiffSearchDomains = []common.StorageDomain{ + common.StorageDomainPathStorage, + common.StorageDomainContract, + + common.StorageDomainPathPrivate, // ?? probably not needed. TODO: check + common.StorageDomainInbox, // ?? probably not needed. TODO: check + // do we need any other? TODO: check +} + +type TokenChangesSearchTokens map[string]SearchToken + +func DefaultTokenDiffSearchTokens(chain flow.Chain) TokenChangesSearchTokens { + sc := systemcontracts.SystemContractsForChain(chain.ChainID()) + flowTokenID := fmt.Sprintf("A.%s.FlowToken.Vault", sc.FlowToken.Address.Hex()) + flowTokenMintedEventID := fmt.Sprintf("A.%s.FlowToken.TokensMinted", sc.FlowToken.Address.Hex()) + + return map[string]SearchToken{ + flowTokenID: { + ID: flowTokenID, + GetBalance: func(value *interpreter.CompositeValue) uint64 { + return uint64(value.GetField(nil, "balance").(interpreter.UFix64Value).UFix64Value) + }, + SinksSources: map[string]func(flow.Event) (int64, error){ + flowTokenMintedEventID: func(evt flow.Event) (int64, error) { + // this decoding will only happen for the specified event (in the case of FlowToken.TokensMinted it + // is extremely rare). + payload, err := ccf.Decode(nil, evt.Payload) + if err != nil { + return 0, err + } + v := payload.(cadence.Event).SearchFieldByName("amount") + if v == nil { + return 0, fmt.Errorf("no amount field found for token minted") + } + + ufix, ok := payload.(cadence.Event).SearchFieldByName("amount").(cadence.UFix64) + if !ok { + return 0, fmt.Errorf("amount field is not a cadence.UFix64") + } + + if ufix > math.MaxInt64 { + // this is very unlikely + // but in case it happens, it will get logged + return 0, fmt.Errorf("amount field is too large") + } + + return int64(ufix), nil + }, + }, + }, + } +} diff --git a/fvm/mock/vm.go b/fvm/mock/vm.go index 12e27262997..114f903591b 100644 --- a/fvm/mock/vm.go +++ b/fvm/mock/vm.go @@ -4,6 +4,8 @@ package mock import ( fvm "github.com/onflow/flow-go/fvm" + inspection "github.com/onflow/flow-go/fvm/inspection" + mock "github.com/stretchr/testify/mock" snapshot "github.com/onflow/flow-go/fvm/storage/snapshot" @@ -16,6 +18,26 @@ type VM struct { mock.Mock } +// Inspect provides a mock function with given fields: ctx, proc, storageSnapshot, executionSnapshot, output +func (_m *VM) Inspect(ctx fvm.Context, proc fvm.Procedure, storageSnapshot snapshot.StorageSnapshot, executionSnapshot *snapshot.ExecutionSnapshot, output fvm.ProcedureOutput) []inspection.Result { + ret := _m.Called(ctx, proc, storageSnapshot, executionSnapshot, output) + + if len(ret) == 0 { + panic("no return value specified for Inspect") + } + + var r0 []inspection.Result + if rf, ok := ret.Get(0).(func(fvm.Context, fvm.Procedure, snapshot.StorageSnapshot, *snapshot.ExecutionSnapshot, fvm.ProcedureOutput) []inspection.Result); ok { + r0 = rf(ctx, proc, storageSnapshot, executionSnapshot, output) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]inspection.Result) + } + } + + return r0 +} + // NewExecutor provides a mock function with given fields: _a0, _a1, _a2 func (_m *VM) NewExecutor(_a0 fvm.Context, _a1 fvm.Procedure, _a2 storage.TransactionPreparer) fvm.ProcedureExecutor { ret := _m.Called(_a0, _a1, _a2) diff --git a/go.mod b/go.mod index 69cfb37b181..370af88df32 100644 --- a/go.mod +++ b/go.mod @@ -47,7 +47,7 @@ require ( github.com/multiformats/go-multiaddr-dns v0.4.1 github.com/multiformats/go-multihash v0.2.3 github.com/onflow/atree v0.12.0 - github.com/onflow/cadence v1.8.8 + github.com/onflow/cadence v1.8.8-token-movements-inspect.1 github.com/onflow/crypto v0.25.3 github.com/onflow/flow v0.4.15 github.com/onflow/flow-core-contracts/lib/go/contracts v1.9.2 diff --git a/go.sum b/go.sum index f3226f3ad78..38457d09d72 100644 --- a/go.sum +++ b/go.sum @@ -940,8 +940,8 @@ github.com/onflow/atree v0.12.0 h1:X7/UEPyCaaEQ1gCg11KDvfyEtEeQLhtRtxMHjAiH/Co= github.com/onflow/atree v0.12.0/go.mod h1:qdZcfLQwPirHcNpLiK+2t3KAo+SAb9Si6TqurE6pykE= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483 h1:LpiQhTAfM9CAmNVEs0n//cBBgCg+vJSiIxTHYUklZ84= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= -github.com/onflow/cadence v1.8.8 h1:FikX060A9lX8fS5tCD+wxAEmY/I9FXQoIBzIN4mor+A= -github.com/onflow/cadence v1.8.8/go.mod h1:MlJsCwhCZwdnAUd24XHzcsizZfG7a2leab1PztabUsE= +github.com/onflow/cadence v1.8.8-token-movements-inspect.1 h1:SAtLJegzRx7zEoEdtfIuCkCv0lNSRlSeym03zOCJXy0= +github.com/onflow/cadence v1.8.8-token-movements-inspect.1/go.mod h1:MlJsCwhCZwdnAUd24XHzcsizZfG7a2leab1PztabUsE= github.com/onflow/crypto v0.25.3 h1:XQ3HtLsw8h1+pBN+NQ1JYM9mS2mVXTyg55OldaAIF7U= github.com/onflow/crypto v0.25.3/go.mod h1:+1igaXiK6Tjm9wQOBD1EGwW7bYWMUGKtwKJ/2QL/OWs= github.com/onflow/fixed-point v0.1.1 h1:j0jYZVO8VGyk1476alGudEg7XqCkeTVxb5ElRJRKS90= diff --git a/insecure/go.mod b/insecure/go.mod index 1e7e2864980..8c6bb151b44 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -215,7 +215,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/onflow/atree v0.12.0 // indirect - github.com/onflow/cadence v1.8.8 // indirect + github.com/onflow/cadence v1.8.8-token-movements-inspect.1 // indirect github.com/onflow/fixed-point v0.1.1 // indirect github.com/onflow/flow-core-contracts/lib/go/contracts v1.9.2 // indirect github.com/onflow/flow-core-contracts/lib/go/templates v1.9.2 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 7ce37368e92..7fea82ad228 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -890,8 +890,8 @@ github.com/onflow/atree v0.12.0 h1:X7/UEPyCaaEQ1gCg11KDvfyEtEeQLhtRtxMHjAiH/Co= github.com/onflow/atree v0.12.0/go.mod h1:qdZcfLQwPirHcNpLiK+2t3KAo+SAb9Si6TqurE6pykE= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483 h1:LpiQhTAfM9CAmNVEs0n//cBBgCg+vJSiIxTHYUklZ84= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= -github.com/onflow/cadence v1.8.8 h1:FikX060A9lX8fS5tCD+wxAEmY/I9FXQoIBzIN4mor+A= -github.com/onflow/cadence v1.8.8/go.mod h1:MlJsCwhCZwdnAUd24XHzcsizZfG7a2leab1PztabUsE= +github.com/onflow/cadence v1.8.8-token-movements-inspect.1 h1:SAtLJegzRx7zEoEdtfIuCkCv0lNSRlSeym03zOCJXy0= +github.com/onflow/cadence v1.8.8-token-movements-inspect.1/go.mod h1:MlJsCwhCZwdnAUd24XHzcsizZfG7a2leab1PztabUsE= github.com/onflow/crypto v0.25.3 h1:XQ3HtLsw8h1+pBN+NQ1JYM9mS2mVXTyg55OldaAIF7U= github.com/onflow/crypto v0.25.3/go.mod h1:+1igaXiK6Tjm9wQOBD1EGwW7bYWMUGKtwKJ/2QL/OWs= github.com/onflow/fixed-point v0.1.1 h1:j0jYZVO8VGyk1476alGudEg7XqCkeTVxb5ElRJRKS90= diff --git a/integration/benchmark/load/load_type_test.go b/integration/benchmark/load/load_type_test.go index d40385c87b6..f99003c1c4a 100644 --- a/integration/benchmark/load/load_type_test.go +++ b/integration/benchmark/load/load_type_test.go @@ -140,6 +140,7 @@ func bootstrapVM(t *testing.T, chain flow.Chain) (*fvm.VirtualMachine, fvm.Conte chain.ChainID(), false, false, + false, ) opts = append(opts, fvm.WithTransactionFeesEnabled(true), diff --git a/integration/go.mod b/integration/go.mod index cc88dacd244..b0252bd6057 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -21,7 +21,7 @@ require ( github.com/ipfs/go-datastore v0.8.2 github.com/ipfs/go-ds-pebble v0.5.0 github.com/libp2p/go-libp2p v0.38.2 - github.com/onflow/cadence v1.8.8 + github.com/onflow/cadence v1.8.8-token-movements-inspect.1 github.com/onflow/crypto v0.25.3 github.com/onflow/flow v0.4.15 github.com/onflow/flow-core-contracts/lib/go/contracts v1.9.2 diff --git a/integration/go.sum b/integration/go.sum index d6cdf5b41fa..4c7d21c50c0 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -764,8 +764,8 @@ github.com/onflow/atree v0.12.0 h1:X7/UEPyCaaEQ1gCg11KDvfyEtEeQLhtRtxMHjAiH/Co= github.com/onflow/atree v0.12.0/go.mod h1:qdZcfLQwPirHcNpLiK+2t3KAo+SAb9Si6TqurE6pykE= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483 h1:LpiQhTAfM9CAmNVEs0n//cBBgCg+vJSiIxTHYUklZ84= github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80= -github.com/onflow/cadence v1.8.8 h1:FikX060A9lX8fS5tCD+wxAEmY/I9FXQoIBzIN4mor+A= -github.com/onflow/cadence v1.8.8/go.mod h1:MlJsCwhCZwdnAUd24XHzcsizZfG7a2leab1PztabUsE= +github.com/onflow/cadence v1.8.8-token-movements-inspect.1 h1:SAtLJegzRx7zEoEdtfIuCkCv0lNSRlSeym03zOCJXy0= +github.com/onflow/cadence v1.8.8-token-movements-inspect.1/go.mod h1:MlJsCwhCZwdnAUd24XHzcsizZfG7a2leab1PztabUsE= github.com/onflow/crypto v0.25.3 h1:XQ3HtLsw8h1+pBN+NQ1JYM9mS2mVXTyg55OldaAIF7U= github.com/onflow/crypto v0.25.3/go.mod h1:+1igaXiK6Tjm9wQOBD1EGwW7bYWMUGKtwKJ/2QL/OWs= github.com/onflow/fixed-point v0.1.1 h1:j0jYZVO8VGyk1476alGudEg7XqCkeTVxb5ElRJRKS90= diff --git a/module/execution/scripts.go b/module/execution/scripts.go index 50a3b93c7da..851d936649b 100644 --- a/module/execution/scripts.go +++ b/module/execution/scripts.go @@ -89,6 +89,7 @@ func NewScripts( chainID, false, true, + false, ) blocks := environment.NewBlockFinder(header) options = append(options, fvm.WithBlocks(blocks)) // add blocks for getBlocks calls in scripts