diff --git a/core/blockchain.go b/core/blockchain.go index 39d65ba7d3..2def72bba2 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -129,10 +129,11 @@ var ( blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) - blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil) - blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) - blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) - blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) + blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil) + blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) + blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) + blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) + blockPrefetchWorkerPanicMeter = metrics.NewRegisteredMeter("chain/prefetch/worker/panic", nil) // Witness and write-path metrics for block production observability. // These track the time spent in each phase of writeBlockWithState, which runs diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 77c8881ebe..8950aa41a9 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -22,12 +22,11 @@ import ( "sync" "sync/atomic" - "golang.org/x/sync/errgroup" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) @@ -57,93 +56,236 @@ type PrefetchResult struct { // Prefetch processes the state changes according to the Ethereum rules by running // the transaction messages using the statedb, but any changes are discarded. The // only goal is to warm the state caches. +// +// This is a thin wrapper over PrefetchStream: it feeds the block's transactions +// into a channel, closes it, and runs the stream to completion. Behavior is +// identical to the pre-stream implementation. func (p *StatePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, intermediateRootPrefetch bool, interrupt *atomic.Bool) *PrefetchResult { - var ( - fails atomic.Int64 - totalGasUsed atomic.Uint64 - successfulTxs []common.Hash - txsMutex sync.Mutex - header = block.Header() - signer = types.MakeSigner(p.config, header.Number, header.Time) - workers errgroup.Group - reader = statedb.Reader() - ) - workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching - - // Iterate over and process the individual transactions - for i, tx := range block.Transactions() { - stateCpy := statedb.Copy() // closure - workers.Go(func() error { - // If block precaching was interrupted, abort - if interrupt != nil && interrupt.Load() { - return nil - } - // Preload the touched accounts and storage slots in advance - sender, err := types.Sender(signer, tx) - if err != nil { - fails.Add(1) - return nil - } - reader.Account(sender) - - if tx.To() != nil { - account, _ := reader.Account(*tx.To()) - - // Preload the contract code if the destination has non-empty code - if account != nil && !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) { - reader.Code(*tx.To(), common.BytesToHash(account.CodeHash)) - } - } - for _, list := range tx.AccessList() { - reader.Account(list.Address) - if len(list.StorageKeys) > 0 { - for _, slot := range list.StorageKeys { - reader.Storage(list.Address, slot) - } + txs := block.Transactions() + ch := make(chan *types.Transaction, len(txs)) + for _, tx := range txs { + ch <- tx + } + close(ch) + return p.PrefetchStream(block.Header(), statedb, cfg, intermediateRootPrefetch, interrupt, nil, ch, nil) +} + +// PrefetchStream warms state caches by executing transactions read from txsCh +// in parallel. It spins up a fixed worker pool once and keeps it alive for the +// whole call; workers exit when txsCh is closed or when hardKill is set. +// +// header — block header used for EVM context. +// statedb — parent state snapshot; each worker makes a per-tx Copy. +// cfg — VM config (no tracer recommended). +// intermediateRootPrefetch — if true, compute IntermediateRoot after each tx. +// hardKill — set by the caller to exit the stream permanently; +// workers return at loop entry. +// evmAbort — soft, repeatable interrupt. When set, aborts in-flight +// EVM work and causes workers to skip (not consume) +// subsequent txs until the caller resets it. Lets the +// caller implement phase transitions without tearing +// down the worker pool. May be nil. +// txsCh — transaction source; stream exits when this closes. +// onSuccess — called from worker goroutines on each successful tx. +// Must be safe for concurrent invocation. May be nil. +func (p *StatePrefetcher) PrefetchStream( + header *types.Header, + statedb *state.StateDB, + cfg vm.Config, + intermediateRootPrefetch bool, + hardKill *atomic.Bool, + evmAbort *atomic.Bool, + txsCh <-chan *types.Transaction, + onSuccess func(hash common.Hash, gasUsed uint64), +) *PrefetchResult { + // Prefer evmAbort as the EVM interrupt (soft, per-phase); fall back to hardKill. + evmInterrupt := evmAbort + if evmInterrupt == nil { + evmInterrupt = hardKill + } + + ctx := &streamCtx{ + p: p, + header: header, + statedb: statedb, + reader: statedb.Reader(), + signer: types.MakeSigner(p.config, header.Number, header.Time), + cfg: cfg, + intermediateRootPrefetch: intermediateRootPrefetch, + hardKill: hardKill, + evmAbort: evmAbort, + evmInterrupt: evmInterrupt, + txsCh: txsCh, + onSuccess: onSuccess, + } + + workers := max(1, 4*runtime.NumCPU()/5) + var pool sync.WaitGroup + pool.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer pool.Done() + // Isolate worker panics: prefetching is best-effort and operates on a + // throwaway state copy, so a panic here must never take down the node. + // The parent runPrefetcher goroutine has its own recover but Go's + // recover only catches panics in its own goroutine, not children. + defer func() { + if r := recover(); r != nil { + // processTx already incremented txIndex before dispatching + // to prefetchOneTx, so a panicked tx must count as a + // failure to keep valid+invalid meters consistent. + ctx.fails.Add(1) + blockPrefetchWorkerPanicMeter.Mark(1) + log.Error("prefetch worker panicked", "err", r) } - } - // Execute the message to preload the implicit touched states - evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg) - - // Convert the transaction into an executable message and pre-cache its sender - msg, err := TransactionToMessage(tx, signer, header.BaseFee) - if err != nil { - fails.Add(1) - return nil // Also invalid block, bail out - } - // Disable the nonce check - msg.SkipNonceChecks = true - - stateCpy.SetTxContext(tx.Hash(), i) - - // We attempt to apply a transaction. The goal is not to execute - // the transaction successfully, rather to warm up touched data slots. - evm.SetInterrupt(interrupt) - result, err := ApplyMessage(evm, msg, new(GasPool).AddGas(block.GasLimit())) - if err != nil { - fails.Add(1) - return nil // Ugh, something went horribly wrong, bail out - } - - if intermediateRootPrefetch { - stateCpy.IntermediateRoot(true) - } - - // Track gas used and successful transaction - totalGasUsed.Add(result.UsedGas) - txsMutex.Lock() - successfulTxs = append(successfulTxs, tx.Hash()) - txsMutex.Unlock() - return nil - }) - } - workers.Wait() - - blockPrefetchTxsValidMeter.Mark(int64(len(block.Transactions())) - fails.Load()) - blockPrefetchTxsInvalidMeter.Mark(fails.Load()) + }() + ctx.runWorker() + }() + } + pool.Wait() + + processed := ctx.txIndex.Load() + blockPrefetchTxsValidMeter.Mark(processed - ctx.fails.Load()) + blockPrefetchTxsInvalidMeter.Mark(ctx.fails.Load()) return &PrefetchResult{ - TotalGasUsed: totalGasUsed.Load(), - SuccessfulTxs: successfulTxs, + TotalGasUsed: ctx.totalGasUsed.Load(), + SuccessfulTxs: ctx.successfulTxs, + } +} + +// streamCtx bundles the per-call state shared by all workers in one PrefetchStream +// invocation. Workers call runWorker to pull from txsCh and warm state caches. +type streamCtx struct { + p *StatePrefetcher + header *types.Header + statedb *state.StateDB + reader state.Reader + signer types.Signer + cfg vm.Config + intermediateRootPrefetch bool + hardKill *atomic.Bool + evmAbort *atomic.Bool + evmInterrupt *atomic.Bool + txsCh <-chan *types.Transaction + onSuccess func(common.Hash, uint64) + + fails atomic.Int64 + totalGasUsed atomic.Uint64 + txIndex atomic.Int64 + txsMutex sync.Mutex + successfulTxs []common.Hash +} + +// runWorker pulls transactions from the stream and processes each one until the +// channel closes or hardKill fires. While evmAbort is set, buffered transactions +// are consumed but skipped (the pool stays alive for subsequent phases). +func (s *streamCtx) runWorker() { + for tx := range s.txsCh { + if s.hardKill != nil && s.hardKill.Load() { + return + } + if s.evmAbort != nil && s.evmAbort.Load() { + continue + } + s.processTx(tx) + } +} + +func (s *streamCtx) processTx(tx *types.Transaction) { + idx := int(s.txIndex.Add(1) - 1) + gasUsed, ok := s.p.prefetchOneTx( + tx, idx, s.header, s.statedb, s.reader, s.signer, s.cfg, + s.intermediateRootPrefetch, s.evmInterrupt, &s.fails, + ) + if !ok { + return + } + s.totalGasUsed.Add(gasUsed) + s.txsMutex.Lock() + s.successfulTxs = append(s.successfulTxs, tx.Hash()) + s.txsMutex.Unlock() + if s.onSuccess != nil { + s.onSuccess(tx.Hash(), gasUsed) + } +} + +// prefetchOneTx executes a single transaction on a copy of statedb to warm caches. +// Shared worker body used by both Prefetch (block-oriented) and PrefetchStream +// (streaming). Returns the execution gas used and a success flag. On any EVM +// interrupt or unrecoverable error it returns (0, false) and increments fails. +func (p *StatePrefetcher) prefetchOneTx( + tx *types.Transaction, + txIdx int, + header *types.Header, + statedb *state.StateDB, + reader state.Reader, + signer types.Signer, + cfg vm.Config, + intermediateRootPrefetch bool, + interrupt *atomic.Bool, + fails *atomic.Int64, +) (uint64, bool) { + if interrupt != nil && interrupt.Load() { + // Match every other failure path in this function — the docstring + // promises fails is incremented on every (0,false) return so the + // {valid,invalid} meters add up to the txIndex without double-counting. + fails.Add(1) + return 0, false + } + + sender, err := preloadReaderForTx(reader, tx, signer) + if err != nil { + fails.Add(1) + return 0, false + } + _ = sender // sender is pre-cached via reader.Account + + stateCpy := statedb.Copy() + msg, err := TransactionToMessage(tx, signer, header.BaseFee) + if err != nil { + fails.Add(1) + return 0, false + } + msg.SkipNonceChecks = true // stream order may diverge from nonce order + stateCpy.SetTxContext(tx.Hash(), txIdx) + + evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg) + evm.SetInterrupt(interrupt) + + result, err := ApplyMessage(evm, msg, new(GasPool).AddGas(header.GasLimit)) + if err != nil { + fails.Add(1) + return 0, false + } + if intermediateRootPrefetch { + stateCpy.IntermediateRoot(true) + } + return result.UsedGas, true +} + +// preloadReaderForTx issues non-blocking reads against the state reader for the +// accounts, code, and access-list slots the tx is likely to touch. This warms +// the caches before EVM execution. Returns the recovered sender so callers can +// avoid a second signature-recovery pass. +func preloadReaderForTx(reader state.Reader, tx *types.Transaction, signer types.Signer) (common.Address, error) { + sender, err := types.Sender(signer, tx) + if err != nil { + return common.Address{}, err + } + reader.Account(sender) + + if to := tx.To(); to != nil { + if account, _ := reader.Account(*to); account != nil && + !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) { + reader.Code(*to, common.BytesToHash(account.CodeHash)) + } + } + for _, list := range tx.AccessList() { + reader.Account(list.Address) + for _, slot := range list.StorageKeys { + reader.Storage(list.Address, slot) + } } + return sender, nil } diff --git a/miner/ordering.go b/miner/ordering.go index 1e8ceb26df..5515457a74 100644 --- a/miner/ordering.go +++ b/miner/ordering.go @@ -190,3 +190,24 @@ func (t *transactionsByPriceAndNonce) Empty() bool { func (t *transactionsByPriceAndNonce) Clear() { t.heads, t.txs = nil, nil } + +// clone returns a shallow copy of the heap suitable for non-destructive scanning. +// LazyTransaction pointers are shared with the original; only the per-account queue +// slices and the heads slice are newly allocated. The heap invariant is preserved in +// the copy because we duplicate the slice in its current heap-ordered state. +func (t *transactionsByPriceAndNonce) clone() *transactionsByPriceAndNonce { + clonedTxs := make(map[common.Address][]*txpool.LazyTransaction, len(t.txs)) + for addr, queue := range t.txs { + c := make([]*txpool.LazyTransaction, len(queue)) + copy(c, queue) + clonedTxs[addr] = c + } + clonedHeads := make(txByPriceAndTime, len(t.heads)) + copy(clonedHeads, t.heads) + return &transactionsByPriceAndNonce{ + txs: clonedTxs, + heads: clonedHeads, + signer: t.signer, + baseFee: t.baseFee, + } +} diff --git a/miner/slow_tx_tracker.go b/miner/slow_tx_tracker.go index 24b3d3668d..7f25e5b9ec 100644 --- a/miner/slow_tx_tracker.go +++ b/miner/slow_tx_tracker.go @@ -19,8 +19,22 @@ const ( // txTimingEntry records how long a single transaction took to apply during block building. type txTimingEntry struct { - hash common.Hash - duration time.Duration + hash common.Hash + duration time.Duration + gasUsed uint64 + prefetched bool +} + +// mgasPerSecond returns the transaction's apply throughput in MGas/s, computed +// with integer math on nanoseconds. Per-tx durations are typically tens of +// microseconds; float seconds would lose precision on the short intervals we +// actually care about. +func (e txTimingEntry) mgasPerSecond() uint64 { + ns := uint64(e.duration.Nanoseconds()) + if ns == 0 { + return 0 + } + return e.gasUsed * 1_000 / ns } type txTimingMinHeap []txTimingEntry @@ -93,11 +107,21 @@ func (t *slowTxTopTracker) Reset() { t.data = t.data[:0] } -// formatSlowTxs returns a compact string of slow txs in order, e.g. "0xabc...(250ms) 0xdef...(100ms)". +// formatSlowTxs returns a compact string of slow txs in order, e.g. +// "0xabc...(250ms, 83 MGas/s, prefetched) 0xdef...(100ms, 42 MGas/s, not-prefetched)". func formatSlowTxs(entries []txTimingEntry) string { parts := make([]string, 0, len(entries)) for i := range entries { - parts = append(parts, fmt.Sprintf("%s(%s)", entries[i].hash.Hex(), common.PrettyDuration(entries[i].duration))) + flag := "not-prefetched" + if entries[i].prefetched { + flag = "prefetched" + } + parts = append(parts, fmt.Sprintf("%s(%s, %d MGas/s, %s)", + entries[i].hash.Hex(), + common.PrettyDuration(entries[i].duration), + entries[i].mgasPerSecond(), + flag, + )) } return strings.Join(parts, " ") } diff --git a/miner/slow_tx_tracker_test.go b/miner/slow_tx_tracker_test.go index 535f21aac0..5fd410954f 100644 --- a/miner/slow_tx_tracker_test.go +++ b/miner/slow_tx_tracker_test.go @@ -3,6 +3,7 @@ package miner import ( "math/big" "math/rand" + "strings" "testing" "time" @@ -66,8 +67,8 @@ func TestSlowTxTopTrackerSnapshotAndReset(t *testing.T) { t.Parallel() tracker := newSlowTxTopTracker() - tracker.Add(txTimingEntry{duration: 4 * time.Millisecond}) - tracker.Add(txTimingEntry{duration: 9 * time.Millisecond}) + tracker.Add(txTimingEntry{duration: 4 * time.Millisecond, gasUsed: 21_000}) + tracker.Add(txTimingEntry{duration: 9 * time.Millisecond, gasUsed: 21_000, prefetched: true}) first := tracker.SnapshotAndReset() require.Len(t, first, 2) @@ -82,3 +83,39 @@ func TestSlowTxTopTrackerSnapshotAndReset(t *testing.T) { require.Len(t, afterReset, 1) require.Equal(t, 7*time.Millisecond, afterReset[0].duration) } + +func TestFormatSlowTxsAnnotatesPrefetchedAndMGasPerSecond(t *testing.T) { + t.Parallel() + + // 21,000 gas in 100µs = 210 MGas/s with integer math: 21000*1000/100000 = 210. + entries := []txTimingEntry{ + { + hash: common.BigToHash(big.NewInt(1)), + duration: 100 * time.Microsecond, + gasUsed: 21_000, + prefetched: true, + }, + { + hash: common.BigToHash(big.NewInt(2)), + duration: 500 * time.Microsecond, + gasUsed: 50_000, + prefetched: false, + }, + } + + out := formatSlowTxs(entries) + + require.Contains(t, out, "210 MGas/s") + require.Contains(t, out, ", prefetched)") + require.Contains(t, out, "100 MGas/s") // 50000*1000/500000 + require.Contains(t, out, ", not-prefetched)") + // Both entries should be separated by a single space. + require.Equal(t, 2, strings.Count(out, "MGas/s")) +} + +func TestMGasPerSecondZeroDuration(t *testing.T) { + t.Parallel() + + e := txTimingEntry{gasUsed: 21_000, duration: 0} + require.Equal(t, uint64(0), e.mgasPerSecond()) +} diff --git a/miner/worker.go b/miner/worker.go index 9a633a12f8..64d3a16a70 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -106,6 +106,10 @@ var ( // txApplyDurationTimer captures per-transaction apply latency during block building. // Uses a larger reservoir to preserve tail visibility on high-throughput blocks. txApplyDurationTimer = newRegisteredCustomTimer("worker/txApplyDuration", 8192) + // Split variants of txApplyDuration by prefetch status. The aggregate timer + // above stays to preserve existing Grafana dashboards. + txApplyDurationPrefetchedTimer = newRegisteredCustomTimer("worker/txApplyDuration/prefetched", 8192) + txApplyDurationNotPrefetchedTimer = newRegisteredCustomTimer("worker/txApplyDuration/notPrefetched", 8192) // finalizeAndAssembleTimer measures time taken to finalize and assemble the block (state root calculation) finalizeAndAssembleTimer = metrics.NewRegisteredTimer("worker/finalizeAndAssemble", nil) // intermediateRootTimer measures time taken to calculate intermediate root @@ -145,6 +149,16 @@ var ( metrics.NewExpDecaySample(1028, 0.015), ) + // prefetchBuilderAddedHistogram tracks the percentage of block transactions that were + // prefetched exclusively during the builder phase (i.e. would have been a miss if the + // idle phase had been the only prefetch source). Directly measures the payoff of the + // builder-phase prefetch over the aggregate miss rate above. + prefetchBuilderAddedHistogram = metrics.NewRegisteredHistogram( + "worker/prefetch/builder_added_percent", + nil, + metrics.NewExpDecaySample(1028, 0.015), + ) + // Trie read/hash/execution metrics for block production (mirroring blockchain.go import path). // Namespaced under worker/chain/ to distinguish from import-path chain/ metrics. workerAccountReadTimer = metrics.NewRegisteredResettingTimer("worker/chain/account/reads", nil) @@ -218,20 +232,26 @@ type environment struct { // Readers with stats tracking for metrics reporting prefetchReader state.ReaderWithStats processReader state.ReaderWithStats + + // prefetchedTxHashes is the live set written by the prefetch stream's + // onSuccess callback. Read at tx-commit time to annotate slow-tx logs and + // split the apply-duration histogram by prefetch status. May be nil. + prefetchedTxHashes *sync.Map } // copy creates a deep copy of environment. func (env *environment) copy() *environment { cpy := &environment{ - signer: env.signer, - state: env.state.Copy(), - tcount: env.tcount, - coinbase: env.coinbase, - header: types.CopyHeader(env.header), - receipts: copyReceipts(env.receipts), - mvReadMapList: env.mvReadMapList, - prefetchReader: env.prefetchReader, - processReader: env.processReader, + signer: env.signer, + state: env.state.Copy(), + tcount: env.tcount, + coinbase: env.coinbase, + header: types.CopyHeader(env.header), + receipts: copyReceipts(env.receipts), + mvReadMapList: env.mvReadMapList, + prefetchReader: env.prefetchReader, + processReader: env.processReader, + prefetchedTxHashes: env.prefetchedTxHashes, } if env.gasPool != nil { @@ -892,7 +912,7 @@ func (w *worker) mainLoop() { tcount := w.current.tcount - w.commitTransactions(w.current, plainTxs, blobTxs, nil) + w.commitTransactions(w.current, plainTxs, blobTxs, nil, nil) stopFn() // Only update the snapshot if any new transactons were added @@ -1192,15 +1212,16 @@ func (w *worker) makeEnv(header *types.Header, coinbase common.Address, witness // Note the passed coinbase may be different with header.Coinbase. env := &environment{ - signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), - state: state, - size: uint64(header.Size()), - coinbase: coinbase, - header: header, - witness: state.Witness(), - evm: vm.NewEVM(core.NewEVMBlockContext(header, w.chain, &coinbase), state, w.chainConfig, w.vmConfig()), - prefetchReader: genParams.prefetchReader, - processReader: genParams.processReader, + signer: types.MakeSigner(w.chainConfig, header.Number, header.Time), + state: state, + size: uint64(header.Size()), + coinbase: coinbase, + header: header, + witness: state.Witness(), + evm: vm.NewEVM(core.NewEVMBlockContext(header, w.chain, &coinbase), state, w.chainConfig, w.vmConfig()), + prefetchReader: genParams.prefetchReader, + processReader: genParams.processReader, + prefetchedTxHashes: genParams.prefetchedTxHashes, } env.evm.SetInterrupt(&w.interruptBlockBuilding) @@ -1249,7 +1270,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]* return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { +func (w *worker) commitTransactions(env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32, builderGasFreedCh chan<- uint64) error { defer func(t0 time.Time) { commitTransactionsTimer.Update(time.Since(t0)) }(time.Now()) @@ -1468,6 +1489,8 @@ mainloop: lastTxSender = from env.state.SetTxContext(tx.Hash(), env.tcount) + // Capture gas pool before execution so we can compute freed gas afterwards. + gasPoolBefore := env.gasPool.Gas() logs, err := w.commitTransaction(env, tx) txDuration := time.Since(lastCommitStart) @@ -1483,11 +1506,29 @@ mainloop: case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) + prefetched := false + if env.prefetchedTxHashes != nil { + _, prefetched = env.prefetchedTxHashes.Load(tx.Hash()) + } if metrics.Enabled() { txApplyDurationTimer.Update(txDuration) + if prefetched { + txApplyDurationPrefetchedTimer.Update(txDuration) + } else { + txApplyDurationNotPrefetchedTimer.Update(txDuration) + } } if w.IsRunning() { - w.slowTxTracker.Add(txTimingEntry{hash: tx.Hash(), duration: txDuration}) + var gasUsed uint64 + if n := len(env.receipts); n > 0 { + gasUsed = env.receipts[n-1].GasUsed + } + w.slowTxTracker.Add(txTimingEntry{ + hash: tx.Hash(), + duration: txDuration, + gasUsed: gasUsed, + prefetched: prefetched, + }) } if EnableMVHashMap && w.IsRunning() { @@ -1522,6 +1563,19 @@ mainloop: txs.Shift() + // Report freed gas to the prefetcher so it can predict overflow txs. + // freed = declared gas limit − actual gas used; non-zero means the block + // has more capacity than the plan assumed, enabling extra txs to fit. + if builderGasFreedCh != nil && ltx.Gas > 0 { + actualUsed := gasPoolBefore - env.gasPool.Gas() + if freed := ltx.Gas - actualUsed; freed > 0 { + select { + case builderGasFreedCh <- freed: + default: + } + } + } + case errors.Is(err, vm.ErrInterrupt): // Timeout interrupt surfaced from EVM execution for this tx. if !hasTxInterruptDelay { @@ -1636,19 +1690,24 @@ mainloop: // generateParams wraps various of settings for generating sealing task. type generateParams struct { - timestamp uint64 // The timestamp for sealing task - forceTime bool // Flag whether the given timestamp is immutable or not - parentHash common.Hash // Parent block hash, empty means the latest chain head - coinbase common.Address // The fee recipient address for including transaction - random common.Hash // The randomness generated by beacon chain, empty before the merge - withdrawals types.Withdrawals // List of withdrawals to include in block. - beaconRoot *common.Hash // The beacon root (cancun field). - noTxs bool // Flag whether an empty block without any transaction is expected - statedb *state.StateDB // The statedb to use for block generation - prefetchReader state.ReaderWithStats // The prefetch reader to use for statistics - processReader state.ReaderWithStats // The process reader to use for statistics - prefetchedTxHashes *sync.Map // Map of successfully prefetched transaction hashes - productionStart time.Time // Start of full-block building (after optional empty pre-seal); used for productionElapsed + timestamp uint64 // The timestamp for sealing task + forceTime bool // Flag whether the given timestamp is immutable or not + parentHash common.Hash // Parent block hash, empty means the latest chain head + coinbase common.Address // The fee recipient address for including transaction + random common.Hash // The randomness generated by beacon chain, empty before the merge + withdrawals types.Withdrawals // List of withdrawals to include in block. + beaconRoot *common.Hash // The beacon root (cancun field). + noTxs bool // Flag whether an empty block without any transaction is expected + statedb *state.StateDB // The statedb to use for block generation + prefetchReader state.ReaderWithStats // The prefetch reader to use for statistics + processReader state.ReaderWithStats // The process reader to use for statistics + prefetchedTxHashes *sync.Map // Map of successfully prefetched transaction hashes + builderPrefetchedTxHashes *sync.Map // Subset of prefetchedTxHashes populated only during the builder phase; used to measure builder-phase contribution + productionStart time.Time // Start of full-block building (after optional empty pre-seal); used for productionElapsed + builderStarted *atomic.Bool // Set when block building begins; immediately interrupts the idle Prefetch() call and triggers builder-mode prefetching + builderPlanCh chan *types.Transaction // Builder sends each validated tx here before execution; prefetcher reads and warms state concurrently + builderGasFreedCh chan uint64 // Builder sends (declared−actual) gas after each successful tx; prefetcher uses it to predict overflow txs + planWg sync.WaitGroup // Tracks sendPlan goroutines; must reach zero before builderPlanCh is closed } // makeHeader creates a new block header for sealing. @@ -1795,13 +1854,111 @@ func (w *worker) buildDefaultFilter(BaseFee *big.Int, Number *big.Int) txpool.Pe return filter } +// buildTxPlan greedily scans h (which is consumed) and returns the ordered list of +// transactions the builder is predicted to include, using declared gas limits as the +// budget. Already-prefetched transactions are excluded from the list but still counted +// against the gas budget so the estimate stays accurate. +// +// Callers that need the original heap unmodified must pass a clone: h.clone(). +// +// The plan is a conservative lower bound: freed gas (actual < declared) means some +// bonus txs may fit that are absent from the plan; those are covered by the per-tx +// channel sends inside commitTransactions. +func buildTxPlan(h *transactionsByPriceAndNonce, gasLimit uint64, prefetchedHashes *sync.Map) []*types.Transaction { + var plan []*types.Transaction + remaining := gasLimit + for { + ltx, _ := h.Peek() + if ltx == nil { + break + } + if ltx.Gas > remaining { + h.Pop() // Too large for remaining space; abandon account (mirrors commitTransactions) + continue + } + // Already warmed during idle prefetch — count against gas budget but skip the send. + // Deliberate: the tx is still bound for the block, so its gas is consumed here. + if prefetchedHashes != nil { + if _, done := prefetchedHashes.Load(ltx.Hash); done { + remaining -= ltx.Gas + h.Shift() + continue + } + } + tx := ltx.Resolve() + if tx == nil { + // Resolve failed (tx evicted from the pool between listing and here); + // don't consume budget for a tx that won't make the block. + h.Pop() + continue + } + remaining -= ltx.Gas + plan = append(plan, tx) + h.Shift() + } + return plan +} + +// scanOverflow greedily pops transactions from the heap that fit within the freed-gas +// budget, returning new txs to prefetch and the remaining budget. Already-prefetched +// txs are skipped entirely (not counted against the budget) because the freed gas +// represents extra block capacity beyond the existing plan — plan txs' gas is already +// committed to the main gas pool and should not be double-counted here. +func scanOverflow( + h *transactionsByPriceAndNonce, + budget uint64, + prefetchedHashes *sync.Map, + sentThisPhase map[common.Hash]struct{}, +) ([]*types.Transaction, uint64) { + var bonus []*types.Transaction + remaining := budget + for { + ltx, _ := h.Peek() + if ltx == nil { + break + } + // Skip already-prefetched (planned) txs without consuming freed budget: + // their gas is already accounted for in the main block gas pool. + if prefetchedHashes != nil { + if _, done := prefetchedHashes.Load(ltx.Hash); done { + h.Shift() + continue + } + } + // Skip txs still in-flight from an earlier plan-batch forward. They + // aren't in prefetchedHashes yet (onSuccess hasn't fired), but a worker + // is already executing them — emitting again just burns a second worker + // on the same tx. + if _, inflight := sentThisPhase[ltx.Hash]; inflight { + h.Shift() + continue + } + if ltx.Gas > remaining { + // Don't pop: extendedBudget accumulates across iterations, so an + // account too large for this window may fit in a later one. Popping + // would permanently evict price-leading accounts that the builder + // is most likely to include. + break + } + tx := ltx.Resolve() + if tx == nil { + h.Pop() + continue + } + remaining -= ltx.Gas + bonus = append(bonus, tx) + h.Shift() + } + return bonus, remaining +} + // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. // //nolint:gocognit -func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) error { +func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment, genParams *generateParams) error { w.mu.RLock() prio := w.prio w.mu.RUnlock() @@ -1834,11 +1991,69 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err } } + // Shared channels used during builder mode. Both are nil when there is no prefetcher. + var builderPlanCh chan<- *types.Transaction + var builderGasFreedCh chan<- uint64 + if genParams != nil && genParams.builderPlanCh != nil { + builderPlanCh = genParams.builderPlanCh + if genParams.builderGasFreedCh != nil { + builderGasFreedCh = genParams.builderGasFreedCh + } + } + + // sendPlan sends the residual (not-yet-prefetched) transactions to the prefetcher + // channel before execution begins. The heap clone is made synchronously — it must + // happen before commitTransactions consumes the heap. The scan and channel sends + // run in a goroutine so the builder's critical path is not blocked. + // + // Already-prefetched txs are excluded from the send but still counted in the gas + // budget so the estimate stays accurate. Bonus txs that fit due to freed gas are + // covered by the prefetcher's own overflow heap driven by builderGasFreedCh. + sendPlan := func(plainTxs *transactionsByPriceAndNonce, gasLimit uint64) { + if builderPlanCh == nil || plainTxs == nil { + return + } + // Clone is O(N) pointer copies — done synchronously before the heap is consumed. + clone := plainTxs.clone() + var prefetchedHashes *sync.Map + if genParams != nil { + prefetchedHashes = genParams.prefetchedTxHashes + } + ch := builderPlanCh + genParams.planWg.Add(1) + go func() { + defer genParams.planWg.Done() + defer func() { + if r := recover(); r != nil { + log.Error("sendPlan goroutine panicked", "err", r, "stack", string(debug.Stack())) + prefetchPanicMeter.Mark(1) + } + }() + for _, tx := range buildTxPlan(clone, gasLimit, prefetchedHashes) { + select { + case ch <- tx: + default: + } + } + }() + } + + // remainingGas returns the block gas still available for the next + // commitTransactions pass. Before the first pass env.gasPool is nil, so we + // fall back to the full header limit. + remainingGas := func() uint64 { + if env.gasPool == nil { + return env.header.GasLimit + } + return env.gasPool.Gas() + } + // Fill the block with all available pending transactions. if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 { plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee, &w.interruptBlockBuilding) blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee, &w.interruptBlockBuilding) - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { + sendPlan(plainTxs, remainingGas()) + if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, builderGasFreedCh); err != nil { return err } } @@ -1847,8 +2062,8 @@ func (w *worker) fillTransactions(interrupt *atomic.Int32, env *environment) err plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee, &w.interruptBlockBuilding) blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee, &w.interruptBlockBuilding) txHeapInitTimer.Update(time.Since(heapInitTime)) - - if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { + sendPlan(plainTxs, remainingGas()) + if err := w.commitTransactions(env, plainTxs, blobTxs, interrupt, builderGasFreedCh); err != nil { return err } } @@ -1872,7 +2087,7 @@ func (w *worker) generateWork(params *generateParams, witness bool) *newPayloadR }) defer timer.Stop() - err := w.fillTransactions(interrupt, work) + err := w.fillTransactions(interrupt, work, nil) if errors.Is(err, errBlockInterruptedByTimeout) { log.Warn("Block building is interrupted", "allowance", common.PrettyDuration(w.newpayloadTimeout)) } @@ -1967,6 +2182,12 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int var interruptPrefetch atomic.Bool newBlockNumber := new(big.Int).Add(parent.Number, common.Big1) if w.config.EnablePrefetch && w.chainConfig.Bor != nil && w.chainConfig.Bor.IsGiugliano(newBlockNumber) { + // Only allocate the builder-mode signal when a prefetcher will consume it. + // Downstream (buildAndCommitBlock, fillTransactions, commitTransactions) gate all + // planning work on `builderStarted != nil`, so leaving it nil means zero overhead + // when prefetch is disabled. + genParams.builderStarted = new(atomic.Bool) + genParams.builderPrefetchedTxHashes = &sync.Map{} go func() { defer func() { if r := recover(); r != nil { @@ -1974,7 +2195,7 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int prefetchPanicMeter.Mark(1) } }() - w.prefetchFromPool(parent, throwaway, &genParams, &interruptPrefetch) + w.runPrefetcher(parent, throwaway, &genParams, &interruptPrefetch) // Goroutine exits naturally after prefetch completes. // Go's GC keeps throwaway StateDB alive while this goroutine references it. // When the goroutine exits, the reference is released and GC can collect it. @@ -1986,6 +2207,12 @@ func (w *worker) commitWork(interrupt *atomic.Int32, noempty bool, timestamp int // buildAndCommitBlock prepares work, fills transactions, and commits the block for sealing. func (w *worker) buildAndCommitBlock(interrupt *atomic.Int32, noempty bool, genParams *generateParams, interruptPrefetch *atomic.Bool) { + // Must be the first defer so the prefetcher goroutine is signaled to exit + // on every return path — including the early return below when prepareWork + // fails. Otherwise runIdleTxProvider loops until gas exhaustion, burning + // CPU on throwaway EVM work while the block build is already aborted. + defer interruptPrefetch.Store(true) + work, err := w.prepareWork(genParams, w.makeWitness) if err != nil { return @@ -1993,7 +2220,16 @@ func (w *worker) buildAndCommitBlock(interrupt *atomic.Int32, noempty bool, genP // Starts accounting time after prepareWork, since it includes the wait we have on Prepare phase of Bor start := time.Now() - interruptPrefetch.Store(true) + + // Create the builder plan channel before signalling builder mode so the prefetcher goroutine + // always finds a valid channel when it transitions. Buffer of 4096 covers a full block's + // worth of transactions with room to spare; the builder never blocks on a full buffer because + // all sends are non-blocking. + if genParams.builderStarted != nil { + genParams.builderPlanCh = make(chan *types.Transaction, 4096) + genParams.builderGasFreedCh = make(chan uint64, 256) + genParams.builderStarted.Store(true) // immediately interrupts idle Prefetch() + mode switch + } stopFn := func() {} defer func() { @@ -2026,7 +2262,21 @@ func (w *worker) buildAndCommitBlock(interrupt *atomic.Int32, noempty bool, genP // productionElapsed for the full block does not include empty-block overhead. genParams.productionStart = time.Now() // Fill pending transactions from the txpool into the block. - err = w.fillTransactions(interrupt, work) + err = w.fillTransactions(interrupt, work, genParams) + // Wait for any sendPlan goroutines to finish before closing the channel. + // These goroutines do only non-blocking sends so they complete in microseconds. + // Waiting here ensures no goroutine sends to a closed channel. + genParams.planWg.Wait() + // Close gas freed channel first so the prefetcher sees it as exhausted before + // the plan channel closes — the prefetcher exits on plan channel close. + if genParams.builderGasFreedCh != nil { + close(genParams.builderGasFreedCh) + } + // Signal the prefetcher that no more transactions will be sent. The prefetcher drains + // any remaining channel entries and then exits naturally. + if genParams.builderPlanCh != nil { + close(genParams.builderPlanCh) + } switch { case err == nil: @@ -2069,129 +2319,377 @@ func (w *worker) buildAndCommitBlock(interrupt *atomic.Int32, noempty bool, genP w.currentMu.Unlock() } -func (w *worker) prefetchFromPool(parent *types.Header, throwaway *state.StateDB, genParams *generateParams, interruptPrefetch *atomic.Bool) { - const minLoopInterval = 100 * time.Millisecond - - baseFee := eip1559.CalcBaseFee(w.chainConfig, parent) - number := new(big.Int).Add(parent.Number, common.Big1) - filter := w.buildDefaultFilter(baseFee, number) - filter.BlobTxs = false - - // Acquire read lock to safely access w.extra in makeHeader +// runPrefetcher owns the lifecycle of the unified prefetcher stream for one block. +// It starts a single long-lived worker pool (via PrefetchStream), runs the idle tx +// provider until the builder flips, executes the idle→builder handoff, and then +// runs the builder tx provider until block building completes. +// +// The handoff between phases uses the prefetcher's soft-interrupt (evmAbort) to +// abort any in-flight idle tx execution and drain buffered idle txs from the +// stream channel, so only builder txs reach the worker pool from that point on. +// hardKill is the permanent stream-exit signal (set by buildAndCommitBlock on exit). +func (w *worker) runPrefetcher(parent *types.Header, throwaway *state.StateDB, genParams *generateParams, hardKill *atomic.Bool) { w.mu.RLock() header, _, err := w.makeHeader(genParams, false) w.mu.RUnlock() - if err != nil { return } - signer := types.MakeSigner(w.chainConfig, header.Number, header.Time) + prefetcher := core.NewStatePrefetcher(w.chainConfig, w.chain.HeaderChain()) + // 4096 ≈ one full block's worth of 21k-gas txs at the 100M-gas block limit. + // Sized to absorb the idle provider's per-loop burst (bounded by gas budget) + // without ever blocking a sender; workers drain far faster than the idle + // heap can fill in practice. Channel memory is ~33 KB — negligible. + txsCh := make(chan *types.Transaction, 4096) + evmAbort := new(atomic.Bool) + // inBuilderPhase gates builder-phase attribution. Flipped to true only + // after the idle→builder handoff completes (evmAbort drain + reset), so + // any onSuccess call firing after that point is known to come from + // post-handoff work. Using genParams.builderStarted directly would open a + // small attribution race: buildAndCommitBlock sets builderStarted=true + // before runPrefetcher reaches the handoff, and an idle-phase tx whose + // EVM work finishes between those two moments would otherwise be + // miscounted as builder. + // + // Residual edge case: a worker that finished ApplyMessage but is still + // inside IntermediateRoot(true) (not interruptible by evmAbort) when the + // handoff completes could still reach onSuccess after inBuilderPhase=true, + // inflating builder attribution by at most one tx. Handoff is sub- + // millisecond in practice while IntermediateRoot spans microseconds to + // low milliseconds, so the window is tiny but not zero. + inBuilderPhase := new(atomic.Bool) + + onSuccess := func(hash common.Hash, _ uint64) { + if genParams.prefetchedTxHashes != nil { + genParams.prefetchedTxHashes.Store(hash, struct{}{}) + } + if inBuilderPhase.Load() && genParams.builderPrefetchedTxHashes != nil { + genParams.builderPrefetchedTxHashes.Store(hash, struct{}{}) + } + } + + streamDone := make(chan struct{}) + go func() { + defer close(streamDone) + prefetcher.PrefetchStream(header, throwaway, w.vmConfig(), true, + hardKill, evmAbort, txsCh, onSuccess) + }() + + // Defer the shutdown so a panic in either provider still releases the + // workers. Without this, range-over-channel blocks forever (hardKill is + // checked only after dequeue) and N+1 goroutines leak per panicking block. + // sync.Once protects against the normal-exit close() racing with this + // deferred close — the normal path does it explicitly below for deterministic + // ordering with <-streamDone. + var shutdownOnce sync.Once + shutdown := func() { + shutdownOnce.Do(func() { + evmAbort.Store(true) + close(txsCh) + }) + } + defer shutdown() + + // Phase 1: idle tx provider — streams pool txs until builder flips or hardKill fires. + w.runIdleTxProvider(txsCh, header, genParams, hardKill) + + // Phase 2: builder tx provider, if we actually switched modes. + if genParams.builderStarted != nil && genParams.builderStarted.Load() && !hardKill.Load() { + // Handoff: abort in-flight idle work and drain buffered idle txs so only + // builder txs reach the pool from here on. Then clear abort and run builder. + // Any in-flight idle EVM execution aborts via evmAbort; workers finish their + // current tx quickly (IntermediateRoot is the only non-interruptible work) + // and move on. Workers that pick up a drained-but-not-gone tx see evmAbort=true + // and skip it. + evmAbort.Store(true) + drainTxChan(txsCh) + evmAbort.Store(false) + // Flip phase attribution only after the handoff is complete. From here + // on, every successful prefetch is genuinely builder-phase work. + inBuilderPhase.Store(true) + + w.runBuilderTxProvider(txsCh, header, genParams, hardKill) + } + + // Normal shutdown: close first, then wait for the stream to drain. The + // defer above is a panic safety net; on the happy path we want the wait + // ordered with the close rather than after the wrapping goroutine's recover. + shutdown() + <-streamDone +} + +// drainTxChan removes all currently-buffered entries from the channel without blocking. +// Safe to call while other goroutines are reading from ch (reads consume; drain stops +// when the channel is empty from the drainer's perspective). +func drainTxChan(ch <-chan *types.Transaction) { + for { + select { + case <-ch: + default: + return + } + } +} + +// runIdleTxProvider speculatively streams transactions from the txpool into the +// prefetcher. It loops on a ~100ms cadence, bounded by a configurable gas budget +// (PrefetchGasLimitPercent of header.GasLimit, defaulting to 100%). Returns when +// the budget is exhausted, the builder flips, or hardKill fires. +// +// Gas accounting uses declared tx gas (not actual execution gas) — close enough +// since the budget only bounds speculative work, not correctness. +func (w *worker) runIdleTxProvider(txsCh chan<- *types.Transaction, header *types.Header, genParams *generateParams, interrupt *atomic.Bool) { + const minLoopInterval = 100 * time.Millisecond + + signer := types.MakeSigner(w.chainConfig, header.Number, header.Time) + filter := w.buildDefaultFilter(header.BaseFee, header.Number) + filter.BlobTxs = false - // Initialize total gas pool with configured percentage of header gas limit - gasLimitPercent := w.config.PrefetchGasLimitPercent - if gasLimitPercent == 0 { - gasLimitPercent = 100 // Default to 100% if not configured + totalGasPool := new(core.GasPool).AddGas(header.GasLimit * idleGasLimitPercent(w.config) / 100) + localPrefetched := make(map[common.Hash]struct{}) + + shouldExit := func() bool { + return interrupt.Load() || + (genParams.builderStarted != nil && genParams.builderStarted.Load()) || + totalGasPool.Gas() == 0 + } + + for !shouldExit() { + loopStart := time.Now() + + pendingTxs := w.eth.TxPool().Pending(filter, interrupt) + txs := newTransactionsByPriceAndNonce(signer, pendingTxs, header.BaseFee, interrupt) + w.streamIdleBatch(txsCh, txs, totalGasPool, localPrefetched, header.GasLimit) + + waitUntilNextLoop(loopStart, minLoopInterval, shouldExit) } - // Defensive cap at 150% to prevent misconfiguration DoS - if gasLimitPercent > 150 { - log.Warn("Prefetch gas limit percent exceeds maximum, capping at 150%", "configured", gasLimitPercent) - gasLimitPercent = 150 +} + +// idleGasLimitPercent returns the configured prefetch gas budget percent, capped +// defensively at 150 and defaulted to 100 when unset. +func idleGasLimitPercent(cfg *Config) uint64 { + pct := cfg.PrefetchGasLimitPercent + if pct == 0 { + return 100 + } + if pct > 150 { + log.Warn("Prefetch gas limit percent exceeds maximum, capping at 150%", "configured", pct) + return 150 } - totalGasLimit := header.GasLimit * gasLimitPercent / 100 - totalGasPool := new(core.GasPool).AddGas(totalGasLimit) + return pct +} - txsAlreadyPrefetched := make(map[common.Hash]struct{}) - loopIteration := 0 +// streamIdleBatch walks the price-nonce heap and non-blockingly forwards +// un-prefetched transactions to txsCh until the per-loop gas cap is exhausted, +// the heap is drained, or the channel fills. Returning on a full channel +// avoids spinning through the rest of the heap doing Peek/Shift work that +// would drop every tx: the outer loop will re-snapshot the pool on its next +// iteration (~100ms later), by which time workers have drained the channel. +func (w *worker) streamIdleBatch( + txsCh chan<- *types.Transaction, + txs *transactionsByPriceAndNonce, + totalGasPool *core.GasPool, + localPrefetched map[common.Hash]struct{}, + headerGasLimit uint64, +) { + loopGasLimit := totalGasPool.Gas() + if loopGasLimit > headerGasLimit { + loopGasLimit = headerGasLimit + } + gaspool := new(core.GasPool).AddGas(loopGasLimit) for { - if interruptPrefetch.Load() { + ltx, tx := nextViableIdleTx(txs, gaspool, localPrefetched) + if ltx == nil { return } - - // Check if we've exhausted the total gas pool - if totalGasPool.Gas() == 0 { + select { + case txsCh <- tx: + localPrefetched[ltx.Hash] = struct{}{} + gaspool.SubGas(ltx.Gas) + totalGasPool.SubGas(ltx.Gas) + default: + // Channel full — stop this batch. The tx we failed to send will + // reappear in the next iteration's pool snapshot. return } + txs.Shift() + } +} - loopStart := time.Now() - loopIteration++ +// nextViableIdleTx advances the heap past txs that are too large for the loop +// budget, already warmed, or fail to resolve, and returns the next tx worth +// sending. Returns (nil, nil) when the heap is drained. +func nextViableIdleTx( + txs *transactionsByPriceAndNonce, + gaspool *core.GasPool, + localPrefetched map[common.Hash]struct{}, +) (*txpool.LazyTransaction, *types.Transaction) { + for { + ltx, _ := txs.Peek() + if ltx == nil { + return nil, nil + } + if gaspool.Gas() < ltx.Gas { + txs.Pop() + continue + } + if _, seen := localPrefetched[ltx.Hash]; seen { + txs.Shift() + continue + } + tx := ltx.Resolve() + if tx == nil { + txs.Pop() + continue + } + return ltx, tx + } +} - // Use the remaining gas from totalGasPool, but cap at header.GasLimit per loop - remainingGas := totalGasPool.Gas() - loopGasLimit := header.GasLimit - if remainingGas < loopGasLimit { - loopGasLimit = remainingGas +// waitUntilNextLoop sleeps up to (window - elapsed since loopStart) in small +// increments so shouldExit can be re-checked for fast shutdown. +func waitUntilNextLoop(loopStart time.Time, window time.Duration, shouldExit func() bool) { + const checkInterval = 10 * time.Millisecond + for remaining := window - time.Since(loopStart); remaining > 0; remaining = window - time.Since(loopStart) { + if shouldExit() { + return + } + sleep := checkInterval + if remaining < checkInterval { + sleep = remaining } - gaspool := new(core.GasPool).AddGas(loopGasLimit) + time.Sleep(sleep) + } +} - pendingTxs := w.eth.TxPool().Pending(filter, interruptPrefetch) - txs := newTransactionsByPriceAndNonce(signer, pendingTxs, header.BaseFee, interruptPrefetch) +// runBuilderTxProvider streams the builder's plan + freed-gas overflow into the +// prefetcher. Each 2ms window it collects plan txs and freed-gas signals via +// collectPlanBatch, scans the overflow heap for any bonus txs that fit in the +// accumulated freed budget, and streams everything to txsCh. Exits when the plan +// channel closes or hardKill fires. +func (w *worker) runBuilderTxProvider(txsCh chan<- *types.Transaction, header *types.Header, genParams *generateParams, interrupt *atomic.Bool) { + const batchWindow = 2 * time.Millisecond - transactions := make([]*types.Transaction, 0) - skippedAlreadyPrefetched := 0 - skippedInsufficientGas := 0 - skippedNilTx := 0 + planCh := genParams.builderPlanCh + if planCh == nil { + return + } - for { - ltx, _ := txs.Peek() - if ltx == nil { - break - } - if gaspool.Gas() < ltx.Gas { - txs.Pop() - skippedInsufficientGas++ - continue - } - if _, exists := txsAlreadyPrefetched[ltx.Hash]; exists { - txs.Shift() - skippedAlreadyPrefetched++ - continue - } + overflowHeap := w.buildOverflowHeap(header, interrupt) - tx := ltx.Resolve() - if tx == nil { - txs.Pop() - skippedNilTx++ - continue - } + var extendedBudget uint64 + var gasFreedCh <-chan uint64 = genParams.builderGasFreedCh - transactions = append(transactions, tx) - gaspool.SubGas(tx.Gas()) - txs.Shift() + // sentThisPhase tracks hashes already forwarded on txsCh within the builder + // phase. genParams.prefetchedTxHashes is only written after onSuccess fires, + // which trails the EVM execution window; a plan tx still in-flight could + // otherwise be re-emitted by scanOverflow from a fresh pool snapshot, wasting + // a second worker on the same tx. Local map is safe because this provider + // runs single-threaded. + sentThisPhase := make(map[common.Hash]struct{}) + + for { + batch, newGasFreedCh, delta, builderDone := collectPlanBatch( + planCh, gasFreedCh, batchWindow, genParams.prefetchedTxHashes, sentThisPhase, + ) + gasFreedCh = newGasFreedCh + extendedBudget += delta + + if extendedBudget > 0 { + // Mark the plan batch as in-flight before the overflow scan so + // scanOverflow won't re-emit the same tx within this iteration + // (collectPlanBatch returns before forwardTxs records hashes). + for _, tx := range batch { + sentThisPhase[tx.Hash()] = struct{}{} + } + var bonus []*types.Transaction + bonus, extendedBudget = scanOverflow(overflowHeap, extendedBudget, genParams.prefetchedTxHashes, sentThisPhase) + batch = append(batch, bonus...) } - block := types.NewBlock(header, &types.Body{Transactions: transactions}, nil, trie.NewStackTrie(nil)) - result := prefetcher.Prefetch(block, throwaway, w.vmConfig(), true, interruptPrefetch) + forwardTxs(txsCh, batch, sentThisPhase) - // Use the actual gas used from prefetch result and mark successful transactions - if result != nil { - totalGasPool.SubGas(result.TotalGasUsed) - for _, txHash := range result.SuccessfulTxs { - txsAlreadyPrefetched[txHash] = struct{}{} - // Store in shared map for coverage metrics - if genParams.prefetchedTxHashes != nil { - genParams.prefetchedTxHashes.Store(txHash, struct{}{}) - } + if builderDone || interrupt.Load() { + return + } + } +} + +// forwardTxs does a non-blocking send of each tx to ch. Drops silently if the +// buffer is full — prefetch is best-effort. Tracks each forwarded hash in +// sentThisPhase so follow-up overflow scans don't re-emit in-flight txs. +func forwardTxs(ch chan<- *types.Transaction, txs []*types.Transaction, sentThisPhase map[common.Hash]struct{}) { + for _, tx := range txs { + select { + case ch <- tx: + if sentThisPhase != nil { + sentThisPhase[tx.Hash()] = struct{}{} } + default: } - // Calculate elapsed time and wait if necessary to ensure minimum 100ms interval - // Check interrupt flag every 10ms during wait for responsive shutdown - elapsed := time.Since(loopStart) - if elapsed < minLoopInterval { - checkInterval := 10 * time.Millisecond + } +} - for remaining := minLoopInterval - elapsed; remaining > 0; remaining = minLoopInterval - time.Since(loopStart) { - if interruptPrefetch.Load() { - return - } +// buildOverflowHeap takes a snapshot of the pending plain-tx pool ordered by gas price. +// The prefetcher uses it to warm bonus txs that fit in the block due to freed gas +// (declared > actual usage). It reuses the same filter as fillTransactions so the +// view is consistent with what the builder sees. +func (w *worker) buildOverflowHeap(header *types.Header, interrupt *atomic.Bool) *transactionsByPriceAndNonce { + filter := w.buildDefaultFilter(header.BaseFee, header.Number) + filter.BlobTxs = false + signer := types.MakeSigner(w.chainConfig, header.Number, header.Time) + pending := w.eth.TxPool().Pending(filter, interrupt) + return newTransactionsByPriceAndNonce(signer, pending, header.BaseFee, interrupt) +} - sleepDuration := checkInterval - if remaining < checkInterval { - sleepDuration = remaining +// collectPlanBatch runs a single batch-collection window. It reads from the plan +// channel into batch (skipping already-prefetched txs and any tx already forwarded +// earlier in this builder phase), accumulates freed-gas signals into budgetDelta, +// and returns when the window timer fires or the plan channel closes. When +// gasFreedCh closes, it is disabled by returning a nil newGasFreedCh so the +// caller can stop selecting on it in subsequent calls. +// +// sentThisPhase closes the scanOverflow→plan cross-iteration edge of the dedup +// matrix: a tx emitted by scanOverflow in an earlier iteration and still +// executing on a worker is absent from prefetchedHashes (onSuccess trails +// multi-ms EVM) but present in sentThisPhase — without this check, a buffered +// copy of the same tx in planCh would get forwarded a second time. +func collectPlanBatch( + planCh <-chan *types.Transaction, + gasFreedCh <-chan uint64, + window time.Duration, + prefetchedHashes *sync.Map, + sentThisPhase map[common.Hash]struct{}, +) (batch []*types.Transaction, newGasFreedCh <-chan uint64, budgetDelta uint64, builderDone bool) { + timer := time.NewTimer(window) + defer timer.Stop() + newGasFreedCh = gasFreedCh + for { + select { + case tx, ok := <-planCh: + if !ok { + builderDone = true + return + } + if prefetchedHashes != nil { + if _, done := prefetchedHashes.Load(tx.Hash()); done { + continue } - time.Sleep(sleepDuration) } + if _, inflight := sentThisPhase[tx.Hash()]; inflight { + continue + } + batch = append(batch, tx) + case freed, ok := <-newGasFreedCh: + if !ok { + newGasFreedCh = nil + } else { + budgetDelta += freed + } + case <-timer.C: + return } } } @@ -2270,17 +2768,29 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // Report prefetch coverage percentage if len(env.txs) > 0 && genParams != nil && genParams.prefetchedTxHashes != nil { prefetchedCount := 0 + builderAddedCount := 0 - // Count how many block transactions were prefetched for _, tx := range env.txs { if _, ok := genParams.prefetchedTxHashes.Load(tx.Hash()); ok { prefetchedCount++ } + if genParams.builderPrefetchedTxHashes != nil { + if _, ok := genParams.builderPrefetchedTxHashes.Load(tx.Hash()); ok { + builderAddedCount++ + } + } } - // Calculate miss rate (0-100): higher = worse + // Miss rate (0-100, higher = worse). missRate := int64((len(env.txs) - prefetchedCount) * 100 / len(env.txs)) prefetchMissRateHistogram.Update(missRate) + + // Builder-added share (0-100): block txs the builder phase prefetched on + // its own. Only emitted when the builder phase actually ran. + if genParams.builderPrefetchedTxHashes != nil { + builderAdded := int64(builderAddedCount * 100 / len(env.txs)) + prefetchBuilderAddedHistogram.Update(builderAdded) + } } } }() diff --git a/miner/worker_prefetch_unit_test.go b/miner/worker_prefetch_unit_test.go new file mode 100644 index 0000000000..30ab9eca36 --- /dev/null +++ b/miner/worker_prefetch_unit_test.go @@ -0,0 +1,470 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// Unit tests for the streaming-prefetch primitives (scanOverflow, forwardTxs, +// collectPlanBatch). These are pure functions exercised without a full worker +// setup; they cover the invariants that prior review rounds kept surfacing +// (within-iter dedup, heap preservation across budget growth, prefetched vs. +// in-flight skip semantics, accounting correctness). + +package miner + +import ( + "crypto/ecdsa" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" +) + +// fakeLazyResolver returns pre-registered txs by hash so LazyTransaction.Resolve() +// works without a real pool. +type fakeLazyResolver struct { + txs map[common.Hash]*types.Transaction +} + +func (r *fakeLazyResolver) Get(h common.Hash) *types.Transaction { + return r.txs[h] +} + +// scanOverflowFixture signs len(gases) txs from distinct accounts (one each so +// every tx heads its own heap bucket) with the given gas limits and gasPrices. +// Returns the signer, a heap constructor, and the raw tx slice. +func scanOverflowFixture(t *testing.T, gases []uint64, gasPrices []int64) (*fakeLazyResolver, map[common.Address][]*txpool.LazyTransaction, types.Signer, []*types.Transaction) { + t.Helper() + require.Equal(t, len(gases), len(gasPrices), "gases and gasPrices must be same length") + + signer := types.LatestSigner(params.TestChainConfig) + resolver := &fakeLazyResolver{txs: make(map[common.Hash]*types.Transaction)} + txsByAcct := make(map[common.Address][]*txpool.LazyTransaction) + var rawTxs []*types.Transaction + + for i := range gases { + key, err := crypto.GenerateKey() + require.NoError(t, err) + to := common.BigToAddress(big.NewInt(int64(i + 1))) + tx := types.MustSignNewTx(key, signer, &types.LegacyTx{ + Nonce: 0, + To: &to, + Value: big.NewInt(0), + Gas: gases[i], + GasPrice: big.NewInt(gasPrices[i]), + Data: nil, + }) + rawTxs = append(rawTxs, tx) + resolver.txs[tx.Hash()] = tx + + from := crypto.PubkeyToAddress(*(key.Public().(*ecdsa.PublicKey))) + txsByAcct[from] = append(txsByAcct[from], &txpool.LazyTransaction{ + Pool: resolver, + Hash: tx.Hash(), + Tx: tx, + Time: time.Now(), + GasFeeCap: uint256.MustFromBig(big.NewInt(gasPrices[i])), + GasTipCap: uint256.MustFromBig(big.NewInt(gasPrices[i])), + Gas: gases[i], + }) + } + return resolver, txsByAcct, signer, rawTxs +} + +// newScanHeap constructs a fresh heap from a (cloned) txsByAcct map so repeated +// scanOverflow calls can start from the same state when needed. +func newScanHeap(signer types.Signer, txsByAcct map[common.Address][]*txpool.LazyTransaction) *transactionsByPriceAndNonce { + cloned := make(map[common.Address][]*txpool.LazyTransaction, len(txsByAcct)) + for k, v := range txsByAcct { + cp := make([]*txpool.LazyTransaction, len(v)) + copy(cp, v) + cloned[k] = cp + } + return newTransactionsByPriceAndNonce(signer, cloned, big.NewInt(0), new(atomic.Bool)) +} + +// TestScanOverflow_ZeroBudget: budget=0 leaves everything untouched. The first +// Peek's ltx.Gas is > 0, and our guard should break immediately. +func TestScanOverflow_ZeroBudget(t *testing.T) { + t.Parallel() + _, txsByAcct, signer, _ := scanOverflowFixture(t, + []uint64{21000, 42000}, + []int64{10, 20}, + ) + heap := newScanHeap(signer, txsByAcct) + + bonus, remaining := scanOverflow(heap, 0, nil, nil) + require.Empty(t, bonus, "zero budget must yield zero bonus txs") + require.Equal(t, uint64(0), remaining) + + // Heap must still surface the same top tx — nothing was consumed. + top, _ := heap.Peek() + require.NotNil(t, top, "heap must still contain accounts after a zero-budget scan") +} + +// TestScanOverflow_PreservesAccountsAcrossBudgetGrowth is the regression test for +// the h.Pop() bug: a high-gas top account must remain in the heap after a +// too-small-budget scan so a later larger-budget scan can include it. +func TestScanOverflow_PreservesAccountsAcrossBudgetGrowth(t *testing.T) { + t.Parallel() + // Top tx gas=500k at high price; second tx gas=21k at lower price so the + // heap orders the 500k tx first. + _, txsByAcct, signer, rawTxs := scanOverflowFixture(t, + []uint64{500_000, 21_000}, + []int64{1_000, 10}, + ) + heap := newScanHeap(signer, txsByAcct) + + // First call: budget 100k. Top tx (500k) doesn't fit; must break (not pop). + bonus, remaining := scanOverflow(heap, 100_000, nil, nil) + require.Empty(t, bonus, "100k budget cannot accommodate 500k top tx") + require.Equal(t, uint64(100_000), remaining, "budget must be untouched on break") + + // Second call on the same heap: budget 600k. Top account must still be + // present and selectable now that the budget has grown. + bonus, remaining = scanOverflow(heap, 600_000, nil, nil) + require.Len(t, bonus, 2, "both accounts should now be drained (500k + 21k)") + seen := map[common.Hash]struct{}{bonus[0].Hash(): {}, bonus[1].Hash(): {}} + require.Contains(t, seen, rawTxs[0].Hash(), + "the previously-skipped 500k tx must re-appear once budget grows") + require.Equal(t, uint64(600_000-500_000-21_000), remaining) +} + +// TestScanOverflow_SkipsInflight: a tx already in sentThisPhase must be skipped +// without consuming budget, so other accounts get a fair scan. +func TestScanOverflow_SkipsInflight(t *testing.T) { + t.Parallel() + // High-priced tx first; low-priced second. Mark the high-priced as + // in-flight; expect the low-priced to be selected with budget intact. + _, txsByAcct, signer, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000}, + []int64{1_000, 10}, + ) + heap := newScanHeap(signer, txsByAcct) + + sentThisPhase := map[common.Hash]struct{}{rawTxs[0].Hash(): {}} + + bonus, remaining := scanOverflow(heap, 100_000, nil, sentThisPhase) + require.Len(t, bonus, 1, "only the non-in-flight tx should be selected") + require.Equal(t, rawTxs[1].Hash(), bonus[0].Hash()) + require.Equal(t, uint64(100_000-21_000), remaining, + "in-flight skip must not consume budget") +} + +// TestScanOverflow_SkipsPrefetched: a tx already in prefetchedHashes must be +// skipped without consuming budget. +func TestScanOverflow_SkipsPrefetched(t *testing.T) { + t.Parallel() + _, txsByAcct, signer, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000}, + []int64{1_000, 10}, + ) + heap := newScanHeap(signer, txsByAcct) + + prefetched := &sync.Map{} + prefetched.Store(rawTxs[0].Hash(), struct{}{}) + + bonus, remaining := scanOverflow(heap, 100_000, prefetched, nil) + require.Len(t, bonus, 1, "only the non-prefetched tx should be selected") + require.Equal(t, rawTxs[1].Hash(), bonus[0].Hash()) + require.Equal(t, uint64(100_000-21_000), remaining, + "prefetched skip must not consume budget") +} + +// TestForwardTxs_RecordsSentHashes: every tx delivered to a roomy channel must +// end up in sentThisPhase. +func TestForwardTxs_RecordsSentHashes(t *testing.T) { + t.Parallel() + _, _, _, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000, 21_000}, + []int64{1, 2, 3}, + ) + ch := make(chan *types.Transaction, len(rawTxs)) + sent := map[common.Hash]struct{}{} + + forwardTxs(ch, rawTxs, sent) + + require.Len(t, sent, len(rawTxs), "all forwarded txs must be recorded") + for _, tx := range rawTxs { + _, ok := sent[tx.Hash()] + require.True(t, ok, "tx %s must be in sentThisPhase", tx.Hash()) + } + require.Len(t, ch, len(rawTxs), "channel should have received every tx") +} + +// TestForwardTxs_DropsOnFullChannelDoesNotRecord: if the channel is full and a +// send is dropped, sentThisPhase must NOT record that hash (otherwise the +// overflow scan would skip a tx that never made it to a worker). +func TestForwardTxs_DropsOnFullChannelDoesNotRecord(t *testing.T) { + t.Parallel() + _, _, _, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000, 21_000}, + []int64{1, 2, 3}, + ) + // Channel capacity 1 → first tx lands, the rest are dropped by the + // non-blocking select. + ch := make(chan *types.Transaction, 1) + sent := map[common.Hash]struct{}{} + + forwardTxs(ch, rawTxs, sent) + + require.Len(t, sent, 1, "only the single tx that actually landed should be recorded") + require.Len(t, ch, 1) + + // The recorded hash must match the tx that's sitting in the channel. + delivered := <-ch + _, recorded := sent[delivered.Hash()] + require.True(t, recorded, "the delivered tx's hash must be the one recorded") +} + +// TestForwardTxs_NilSentMapIsSafe: backward-compat path (block-equivalence +// wrappers pass nil) must not panic. +func TestForwardTxs_NilSentMapIsSafe(t *testing.T) { + t.Parallel() + _, _, _, rawTxs := scanOverflowFixture(t, + []uint64{21_000}, + []int64{1}, + ) + ch := make(chan *types.Transaction, 1) + require.NotPanics(t, func() { + forwardTxs(ch, rawTxs, nil) + }) + require.Len(t, ch, 1) +} + +// TestCollectPlanBatch_ClosedPlanCh: closing planCh returns builderDone=true +// with any already-buffered txs. +func TestCollectPlanBatch_ClosedPlanCh(t *testing.T) { + t.Parallel() + _, _, _, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000}, + []int64{1, 2}, + ) + planCh := make(chan *types.Transaction, len(rawTxs)) + for _, tx := range rawTxs { + planCh <- tx + } + close(planCh) + + batch, newGasCh, delta, done := collectPlanBatch(planCh, nil, 50*time.Millisecond, nil, nil) + require.True(t, done, "closed planCh must surface builderDone=true") + require.Len(t, batch, len(rawTxs), "buffered txs must be drained into batch") + require.Equal(t, uint64(0), delta) + require.Nil(t, newGasCh) +} + +// TestCollectPlanBatch_TimerFiresOnEmptyInput: if no signal arrives within the +// window, the batch must return cleanly with empty state and builderDone=false. +func TestCollectPlanBatch_TimerFiresOnEmptyInput(t *testing.T) { + t.Parallel() + planCh := make(chan *types.Transaction) + gasCh := make(chan uint64) + + start := time.Now() + batch, newGasCh, delta, done := collectPlanBatch(planCh, gasCh, 25*time.Millisecond, nil, nil) + elapsed := time.Since(start) + + require.False(t, done, "timer expiry must not mark the builder done") + require.Empty(t, batch) + require.Equal(t, uint64(0), delta) + require.Equal(t, (<-chan uint64)(gasCh), newGasCh, "idle gasCh must pass through unchanged") + require.GreaterOrEqual(t, elapsed, 20*time.Millisecond, + "timer must block for ~window before returning") + require.Less(t, elapsed, 200*time.Millisecond, + "timer must not drag far past the window") +} + +// TestCollectPlanBatch_FreedGasAccumulates: multiple freed-gas values must sum +// into budgetDelta within a single window. +func TestCollectPlanBatch_FreedGasAccumulates(t *testing.T) { + t.Parallel() + planCh := make(chan *types.Transaction) + gasCh := make(chan uint64, 3) + gasCh <- 1_000 + gasCh <- 2_500 + gasCh <- 500 + + _, _, delta, done := collectPlanBatch(planCh, gasCh, 25*time.Millisecond, nil, nil) + require.False(t, done) + require.Equal(t, uint64(4_000), delta, + "budgetDelta must sum all freed-gas values received within the window") +} + +// TestCollectPlanBatch_SkipsPrefetched: a tx whose hash is in prefetchedHashes +// must be dropped on the way in (not forwarded in batch). +func TestCollectPlanBatch_SkipsPrefetched(t *testing.T) { + t.Parallel() + _, _, _, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000}, + []int64{1, 2}, + ) + planCh := make(chan *types.Transaction, len(rawTxs)) + for _, tx := range rawTxs { + planCh <- tx + } + close(planCh) + + prefetched := &sync.Map{} + prefetched.Store(rawTxs[0].Hash(), struct{}{}) + + batch, _, _, done := collectPlanBatch(planCh, nil, 25*time.Millisecond, prefetched, nil) + require.True(t, done) + require.Len(t, batch, 1, "prefetched tx must be filtered out") + require.Equal(t, rawTxs[1].Hash(), batch[0].Hash()) +} + +// TestBuilderTxProvider_NoDuplicateForwards is the regression test for the +// within-iteration dedup bug (PR #2192 review): a tx that arrives via planCh +// must not also be forwarded via scanOverflow within the same 2ms batch window +// when freed-gas signals arrive in that same window. Covers both the within- +// iteration and cross-iteration dedup invariants. +// +// Setup: a worker with a real txpool (so buildOverflowHeap can draw a +// price-ordered snapshot), no pre-prefetched hashes (so every pool tx is a +// valid overflow candidate), and a planCh that the test drives with each pool +// tx individually, interleaved with freed-gas signals. The fix pre-populates +// sentThisPhase with the plan batch before scanOverflow; without it, the top +// plan tx would re-surface via the overflow heap and get forwarded twice. +func TestBuilderTxProvider_NoDuplicateForwards(t *testing.T) { + t.Parallel() + + w, b, engine, ctrl := setupBorWorkerWithPrefetch(t, 100, 2*time.Second) + defer engine.Close() + defer ctrl.Finish() + defer w.close() + + const totalTxs = 40 + addTransactionBatch(b, totalTxs, false) + time.Sleep(200 * time.Millisecond) + + pending := b.txPool.Pending(txpool.PendingFilter{}, nil) + require.NotEmpty(t, pending) + + var poolTxs []*types.Transaction + for _, lazyTxs := range pending { + for _, ltx := range lazyTxs { + if tx := ltx.Resolve(); tx != nil { + poolTxs = append(poolTxs, tx) + } + } + } + require.GreaterOrEqual(t, len(poolTxs), 10) + + parent := w.chain.CurrentBlock() + _, stateDB, prefetchReader, processReader, err := w.chain.StateAtWithReaders(parent.Root) + require.NoError(t, err) + + w.mu.RLock() + header, _, err := w.makeHeader(&generateParams{ + timestamp: uint64(time.Now().Unix()), + coinbase: testBankAddress, + parentHash: parent.Hash(), + statedb: stateDB, + prefetchReader: prefetchReader, + processReader: processReader, + }, false) + w.mu.RUnlock() + require.NoError(t, err) + + planCh := make(chan *types.Transaction, len(poolTxs)) + gasFreedCh := make(chan uint64, len(poolTxs)) + streamCh := make(chan *types.Transaction, len(poolTxs)*4) + + genParams := &generateParams{ + prefetchedTxHashes: &sync.Map{}, // empty: every pool tx is an overflow candidate + builderStarted: new(atomic.Bool), + builderPlanCh: planCh, + builderGasFreedCh: gasFreedCh, + } + genParams.builderStarted.Store(true) + + var interrupt atomic.Bool + done := make(chan struct{}) + go func() { + defer close(done) + w.runBuilderTxProvider(streamCh, header, genParams, &interrupt) + }() + + // Drive the provider: for each pool tx, push it onto planCh and simultaneously + // deliver a generous freed-gas signal. Both arrive within the same 2ms window, + // so collectPlanBatch returns with the tx in `batch` AND a non-zero budgetDelta. + // Without the within-iteration dedup guard, scanOverflow would re-emit the same + // tx — it still heads the overflow heap because it's not yet in sentThisPhase + // (forwardTxs hasn't run) and not yet in prefetchedHashes (onSuccess hasn't + // fired). + for _, tx := range poolTxs { + planCh <- tx + gasFreedCh <- 500_000 // plenty of budget for any bonus candidate + } + close(gasFreedCh) + time.Sleep(50 * time.Millisecond) // let the provider drain both channels + close(planCh) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("runBuilderTxProvider did not exit") + } + close(streamCh) + + counts := make(map[common.Hash]int) + for tx := range streamCh { + counts[tx.Hash()]++ + } + + for h, n := range counts { + require.LessOrEqual(t, n, 1, "tx %s forwarded %d times — dedup failed", h, n) + } + t.Logf("runBuilderTxProvider forwarded %d unique hashes across %d pool txs with interleaved freed-gas; no duplicates.", + len(counts), len(poolTxs)) +} + +// TestCollectPlanBatch_SkipsInflight is the regression test for the +// scanOverflow→plan cross-iteration dedup edge: a tx already in sentThisPhase +// (from a prior scanOverflow emission whose worker is still mid-EVM and hasn't +// populated prefetchedHashes yet) arriving via planCh must be dropped, not +// forwarded a second time. +func TestCollectPlanBatch_SkipsInflight(t *testing.T) { + t.Parallel() + _, _, _, rawTxs := scanOverflowFixture(t, + []uint64{21_000, 21_000, 21_000}, + []int64{1, 2, 3}, + ) + planCh := make(chan *types.Transaction, len(rawTxs)) + for _, tx := range rawTxs { + planCh <- tx + } + close(planCh) + + // Simulate: rawTxs[0] was already forwarded by a prior scanOverflow + // iteration; its worker is still executing so prefetchedHashes is empty + // but sentThisPhase has the hash. + sentThisPhase := map[common.Hash]struct{}{rawTxs[0].Hash(): {}} + + batch, _, _, done := collectPlanBatch(planCh, nil, 25*time.Millisecond, nil, sentThisPhase) + require.True(t, done) + require.Len(t, batch, 2, "in-flight tx must be filtered out") + for _, tx := range batch { + require.NotEqual(t, rawTxs[0].Hash(), tx.Hash(), + "tx already in sentThisPhase must not appear in batch") + } +} + +// TestCollectPlanBatch_ClosedGasChPassesThrough: once gasCh is closed, the +// returned channel must be nil so subsequent iterations stop selecting on it. +func TestCollectPlanBatch_ClosedGasChPassesThrough(t *testing.T) { + t.Parallel() + planCh := make(chan *types.Transaction) + gasCh := make(chan uint64) + close(gasCh) + + _, newGasCh, _, done := collectPlanBatch(planCh, gasCh, 25*time.Millisecond, nil, nil) + require.False(t, done) + require.Nil(t, newGasCh, "closed gasCh must be nilled out so the next iteration ignores it") +} diff --git a/miner/worker_test.go b/miner/worker_test.go index 8da774b8e4..843deb0a44 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -53,6 +53,7 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/tests/bor/mocks" + "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/triedb" "github.com/ethereum/go-ethereum/consensus/bor/heimdall/milestone" @@ -2681,10 +2682,17 @@ func TestPrefetchMultiBlock(t *testing.T) { memStatsAfter.HeapAlloc/(1024*1024), heapDelta/(1024*1024)) - // Check for goroutine leaks - // Allow for some variance due to runtime internals - if goroutineDelta > 10 { - t.Errorf("Goroutine leak detected: delta=%d", goroutineDelta) + // Check for goroutine leaks. + // The streaming prefetcher keeps ~N=4*NumCPU/5 worker goroutines alive for the + // whole duration of each block's prefetch (vs short-lived per-batch goroutines in + // the old batched prefetch). If one or two commitWork goroutines are still + // blocked in bor.Prepare at measurement time, their prefetchers keep N+2 + // goroutines alive. We allow for two in-flight blocks to be mid-production: + // 2 * (N + 2) + 10 (runtime/test noise). A genuine leak scales with block count + // and would exceed this easily. + maxAllowed := 2*(4*runtime.NumCPU()/5+2) + 10 + if goroutineDelta > maxAllowed { + t.Errorf("Goroutine leak detected: delta=%d (max allowed=%d)", goroutineDelta, maxAllowed) } // Check for excessive memory growth @@ -2967,3 +2975,565 @@ func TestDelayFlagOffByOne(t *testing.T) { require.True(t, buggyDelayFlag(), "bug: last tx skipped, DAG hint incorrectly embedded") require.False(t, fixedDelayFlag(), "fix: last tx detected, DAG hint suppressed") } + +// TestPrefetchFromPool_BuilderModeSwitch verifies that when builderStarted is signaled +// the prefetch goroutine transitions from speculative idle mode to a single targeted builder +// pass and then exits cleanly. +// +// Key properties verified: +// 1. Blocks are mined successfully across multiple cycles with the new mechanism. +// 2. The prefetch goroutine exits after each block cycle (no goroutine leak). +// 3. The mode switch does not cause panics or deadlocks. +// +// Run with -race to verify no data races on prefetchedTxHashes or throwaway state. +func TestPrefetchFromPool_BuilderModeSwitch(t *testing.T) { + // Not parallel: measures goroutine counts and must run without interference. + + const ( + numBlocks = 8 + txCount = 300 + blockInterval = 400 * time.Millisecond // time between newWorkCh signals + settleTime = 2 * time.Second // time allowed for goroutines to exit after stop + ) + + w, b, engine, ctrl := setupBorWorkerWithPrefetch(t, 100, 1*time.Second) + defer engine.Close() + defer ctrl.Finish() + + // Populate the pool so idle and builder-mode prefetch both have work to do. + addTransactionBatch(b, txCount, false) + time.Sleep(200 * time.Millisecond) // let pool promote txs to pending + + w.start() + defer w.stop() + + // Baseline goroutine count after the worker is up. + runtime.GC() + goroutinesBefore := runtime.NumGoroutine() + + // Drive multiple block cycles. Each cycle exercises the full path: + // idle prefetch → builderStarted.Store(true) → builder-mode single pass → goroutine exits. + for i := 0; i < numBlocks; i++ { + w.newWorkCh <- &newWorkReq{ + interrupt: new(atomic.Int32), + noempty: false, + timestamp: time.Now().Unix() + int64(i*2), + } + time.Sleep(blockInterval) + } + + t.Logf("Triggered %d block cycles with %d transactions in the pool", numBlocks, txCount) + + // Allow all prefetch goroutines to settle after the last block cycle. + w.stop() + time.Sleep(settleTime) + runtime.GC() + + goroutinesAfter := runtime.NumGoroutine() + + // Goroutine count after settling must not have grown significantly. + // Delta ≤15 accounts for race-detector overhead and general test infrastructure noise. + // A genuine leak would add one goroutine per block cycle (8 here) exceeding this easily. + delta := goroutinesAfter - goroutinesBefore + require.LessOrEqual(t, delta, 15, + "goroutine leak detected: before=%d after=%d delta=%d — prefetch goroutine may not be exiting after builder-mode pass", + goroutinesBefore, goroutinesAfter, delta) + + t.Logf("Goroutine count: before=%d after=%d delta=%d", goroutinesBefore, goroutinesAfter, delta) +} + +// TestBuilderTxProvider_FiltersAlreadyPrefetched verifies that runBuilderTxProvider +// only forwards plan-channel transactions that are NOT already in prefetchedTxHashes. +// +// It pre-populates prefetchedTxHashes with half of the pool transactions, sends all +// pool transactions over builderPlanCh, closes it, and asserts that only the un- +// prefetched half reached the downstream stream channel. +func TestBuilderTxProvider_FiltersAlreadyPrefetched(t *testing.T) { + t.Parallel() + + w, b, engine, ctrl := setupBorWorkerWithPrefetch(t, 100, 2*time.Second) + defer engine.Close() + defer ctrl.Finish() + defer w.close() + + addTransactionBatch(b, 50, false) + time.Sleep(200 * time.Millisecond) + + pending := b.txPool.Pending(txpool.PendingFilter{}, nil) + require.NotEmpty(t, pending, "pool should have pending txs") + + var allTxs []*types.Transaction + for _, lazyTxs := range pending { + for _, ltx := range lazyTxs { + if tx := ltx.Resolve(); tx != nil { + allTxs = append(allTxs, tx) + } + } + } + require.NotEmpty(t, allTxs, "should have resolved transactions") + + // Mark the first half as already prefetched. + prefetchedHashes := &sync.Map{} + prePrefetchedCount := len(allTxs) / 2 + for _, tx := range allTxs[:prePrefetchedCount] { + prefetchedHashes.Store(tx.Hash(), struct{}{}) + } + t.Logf("Pre-prefetched %d/%d transactions (simulating idle coverage)", prePrefetchedCount, len(allTxs)) + + // Build the header used by the provider. + parent := w.chain.CurrentBlock() + state, _, prefetchReader, processReader, err := w.chain.StateAtWithReaders(parent.Root) + require.NoError(t, err) + + w.mu.RLock() + genParamsForHeader := &generateParams{ + timestamp: uint64(time.Now().Unix()), + coinbase: testBankAddress, + parentHash: parent.Hash(), + statedb: state, + prefetchReader: prefetchReader, + processReader: processReader, + } + header, _, err := w.makeHeader(genParamsForHeader, false) + w.mu.RUnlock() + require.NoError(t, err) + + // Set up plan channel (builder → provider) and the downstream stream channel + // (provider → prefetcher, consumed directly by the test). + planCh := make(chan *types.Transaction, len(allTxs)) + for _, tx := range allTxs { + planCh <- tx + } + close(planCh) + + streamCh := make(chan *types.Transaction, len(allTxs)*2) + + genParams := &generateParams{ + prefetchedTxHashes: prefetchedHashes, + builderStarted: new(atomic.Bool), + builderPlanCh: planCh, + } + genParams.builderStarted.Store(true) + + var interrupt atomic.Bool + + // Run the provider. It reads from planCh, filters via prefetchedHashes, sends to streamCh. + w.runBuilderTxProvider(streamCh, header, genParams, &interrupt) + + // Collect everything the provider forwarded. + close(streamCh) + seen := make(map[common.Hash]struct{}) + for tx := range streamCh { + seen[tx.Hash()] = struct{}{} + } + + // Expected: only the non-prefetched half (allTxs[prePrefetchedCount:]) should have been forwarded. + forwarded := len(seen) + for _, tx := range allTxs[:prePrefetchedCount] { + _, found := seen[tx.Hash()] + require.False(t, found, "already-prefetched tx %s should not be forwarded", tx.Hash()) + } + require.Equal(t, len(allTxs)-prePrefetchedCount, forwarded, + "provider should forward exactly the un-prefetched txs (pool=%d pre-prefetched=%d forwarded=%d)", + len(allTxs), prePrefetchedCount, forwarded) + + t.Logf("runBuilderTxProvider forwarded %d/%d txs (skipping %d pre-prefetched)", + forwarded, len(allTxs), prePrefetchedCount) +} + +// TestBuildTxPlan verifies that buildTxPlan: +// - does NOT consume the original heap (heap remains usable after the call) +// - excludes transactions already present in prefetchedHashes +// - still accounts for the gas of excluded transactions (so the plan respects gas budget) +// - returns a non-empty plan when un-prefetched transactions are available +func TestBuildTxPlan(t *testing.T) { + t.Parallel() + + w, b, engine, ctrl := setupBorWorkerWithPrefetch(t, 100, 2*time.Second) + defer engine.Close() + defer ctrl.Finish() + defer w.close() + + addTransactionBatch(b, 60, false) + time.Sleep(200 * time.Millisecond) // let pool promote + + pending := b.txPool.Pending(txpool.PendingFilter{}, nil) + require.NotEmpty(t, pending, "pool should have pending txs") + + parent := w.chain.CurrentBlock() + baseFee := parent.BaseFee + signer := types.MakeSigner(w.chainConfig, parent.Number, parent.Time) + + // Split pending into priority and normal (no priority accounts here, so all normal). + txsMap := make(map[common.Address][]*txpool.LazyTransaction) + var allHashes []common.Hash + for addr, lazyTxs := range pending { + txsMap[addr] = lazyTxs + for _, ltx := range lazyTxs { + allHashes = append(allHashes, ltx.Hash) + } + } + require.Greater(t, len(allHashes), 1, "need at least 2 txs") + + heap := newTransactionsByPriceAndNonce(signer, txsMap, baseFee, new(atomic.Bool)) + + // Count how many entries the heap exposes before calling buildTxPlan. + countHeap := func(h *transactionsByPriceAndNonce) int { + c := h.clone() + n := 0 + for { + ltx, _ := c.Peek() + if ltx == nil { + break + } + n++ + c.Shift() + } + return n + } + heapSizeBefore := countHeap(heap) + require.Greater(t, heapSizeBefore, 0) + + // Mark the first half of tx hashes as already prefetched. + prefetched := &sync.Map{} + halfIdx := len(allHashes) / 2 + for _, h := range allHashes[:halfIdx] { + prefetched.Store(h, struct{}{}) + } + + // buildTxPlan consumes what it receives; pass a clone to preserve the original heap. + plan := buildTxPlan(heap.clone(), parent.GasLimit, prefetched) + + // Heap must NOT be consumed — clone was passed, not the original. + heapSizeAfter := countHeap(heap) + require.Equal(t, heapSizeBefore, heapSizeAfter, + "original heap must be intact after buildTxPlan (clone was passed)") + + // Plan must be non-empty (some txs were not pre-prefetched). + require.NotEmpty(t, plan, "plan should contain un-prefetched transactions") + + // Plan must not contain any already-prefetched tx hash. + prefetchedCount := 0 + for _, tx := range plan { + _, alreadyDone := prefetched.Load(tx.Hash()) + require.False(t, alreadyDone, + "plan must not include already-prefetched tx %s", tx.Hash()) + prefetchedCount++ + } + t.Logf("buildTxPlan: heap=%d txs, pre-prefetched=%d, plan=%d residual", + heapSizeBefore, halfIdx, len(plan)) +} + +// TestBuilderTxProvider_FreedGasFeedback verifies the freed-gas feedback loop at +// the provider layer: when the builder reports freed gas via builderGasFreedCh, +// runBuilderTxProvider scans the overflow heap and forwards bonus txs beyond +// what was already prefetched. +// +// Setup: 50 txs in pool, first 80% already marked as prefetched. Freed-gas channel +// delivers 5 × 21000 gas BEFORE plan channel closes so the collect loop timer fires +// and the overflow scan runs with a non-zero budget. Expected: the downstream +// stream channel receives bonus txs from the unprefetched tail. +func TestBuilderTxProvider_FreedGasFeedback(t *testing.T) { + t.Parallel() + + w, b, engine, ctrl := setupBorWorkerWithPrefetch(t, 100, 2*time.Second) + defer engine.Close() + defer ctrl.Finish() + defer w.close() + + const totalTxs = 50 + addTransactionBatch(b, totalTxs, false) + time.Sleep(200 * time.Millisecond) + + pending := b.txPool.Pending(txpool.PendingFilter{}, nil) + require.NotEmpty(t, pending, "pool should have pending txs") + + var allTxs []*types.Transaction + for _, lazyTxs := range pending { + for _, ltx := range lazyTxs { + if tx := ltx.Resolve(); tx != nil { + allTxs = append(allTxs, tx) + } + } + } + require.GreaterOrEqual(t, len(allTxs), 10, "need at least 10 txs") + + prePrefetchedCount := len(allTxs) * 80 / 100 + prefetchedHashes := &sync.Map{} + for _, tx := range allTxs[:prePrefetchedCount] { + prefetchedHashes.Store(tx.Hash(), struct{}{}) + } + t.Logf("Pre-prefetched %d/%d transactions; overflow pool has %d remaining", + prePrefetchedCount, len(allTxs), len(allTxs)-prePrefetchedCount) + + parent := w.chain.CurrentBlock() + state, _, prefetchReader, processReader, err := w.chain.StateAtWithReaders(parent.Root) + require.NoError(t, err) + + w.mu.RLock() + genParamsForHeader := &generateParams{ + timestamp: uint64(time.Now().Unix()), + coinbase: testBankAddress, + parentHash: parent.Hash(), + statedb: state, + prefetchReader: prefetchReader, + processReader: processReader, + } + header, _, err := w.makeHeader(genParamsForHeader, false) + w.mu.RUnlock() + require.NoError(t, err) + + // Plan channel stays open until after we've sent freed-gas signals. + planCh := make(chan *types.Transaction, 1) + gasFreedCh := make(chan uint64, 16) + streamCh := make(chan *types.Transaction, len(allTxs)*2) + + genParams := &generateParams{ + prefetchedTxHashes: prefetchedHashes, + builderStarted: new(atomic.Bool), + builderPlanCh: planCh, + builderGasFreedCh: gasFreedCh, + } + genParams.builderStarted.Store(true) + + var interrupt atomic.Bool + done := make(chan struct{}) + go func() { + defer close(done) + w.runBuilderTxProvider(streamCh, header, genParams, &interrupt) + }() + + // Send 5 × 21000 freed gas, close the gas freed channel. + const freedPerSignal = uint64(21000) + const freedSignals = 5 + for i := 0; i < freedSignals; i++ { + gasFreedCh <- freedPerSignal + } + close(gasFreedCh) + + // Allow the 2ms timer to fire and the overflow scan to run. Then close planCh. + time.Sleep(20 * time.Millisecond) + close(planCh) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("runBuilderTxProvider did not exit within timeout") + } + + close(streamCh) + seen := make(map[common.Hash]struct{}) + for tx := range streamCh { + seen[tx.Hash()] = struct{}{} + } + + forwarded := len(seen) + t.Logf("streamCh received %d bonus txs (freed budget=%d gas, expected up to %d)", + forwarded, freedSignals*freedPerSignal, freedSignals) + + // At least one bonus tx should reach the stream — the overflow scan found + // non-prefetched txs that fit within the freed-gas budget. + require.Greater(t, forwarded, 0, + "overflow scan should have forwarded bonus txs beyond the plan") + // None of the forwarded txs should be pre-prefetched. + for _, tx := range allTxs[:prePrefetchedCount] { + _, found := seen[tx.Hash()] + require.False(t, found, "already-prefetched tx %s must not be forwarded", tx.Hash()) + } +} + +// streamTestFixture produces a throwaway state, a header, and a real StatePrefetcher +// suitable for driving PrefetchStream directly with a set of transactions. Returns +// the pool txs so tests can feed them into the stream. +func streamTestFixture(t *testing.T) (*worker, *testWorkerBackend, *types.Header, *state.StateDB, []*types.Transaction) { + t.Helper() + w, b, engine, ctrl := setupBorWorkerWithPrefetch(t, 100, 2*time.Second) + t.Cleanup(func() { + ctrl.Finish() + engine.Close() + w.close() + }) + + addTransactionBatch(b, 20, false) + time.Sleep(200 * time.Millisecond) + + pending := b.txPool.Pending(txpool.PendingFilter{}, nil) + require.NotEmpty(t, pending, "pool should have pending txs") + + var allTxs []*types.Transaction + for _, lazyTxs := range pending { + for _, ltx := range lazyTxs { + if tx := ltx.Resolve(); tx != nil { + allTxs = append(allTxs, tx) + } + } + } + require.NotEmpty(t, allTxs) + + parent := w.chain.CurrentBlock() + _, throwaway, prefetchReader, processReader, err := w.chain.StateAtWithReaders(parent.Root) + require.NoError(t, err) + + w.mu.RLock() + header, _, err := w.makeHeader(&generateParams{ + timestamp: uint64(time.Now().Unix()), + coinbase: testBankAddress, + parentHash: parent.Hash(), + prefetchReader: prefetchReader, + processReader: processReader, + }, false) + w.mu.RUnlock() + require.NoError(t, err) + + return w, b, header, throwaway, allTxs +} + +// TestPrefetchStream_HardKillExits verifies that setting hardKill + closing txsCh +// exits the stream quickly and skips buffered-but-unprocessed txs. The production +// shutdown path pairs these two signals (runPrefetcher does the same), so this is +// the realistic scenario. +func TestPrefetchStream_HardKillExits(t *testing.T) { + t.Parallel() + w, _, header, throwaway, allTxs := streamTestFixture(t) + require.GreaterOrEqual(t, len(allTxs), 10, "need >=10 txs so some remain unprocessed") + + prefetcher := core.NewStatePrefetcher(w.chainConfig, w.chain.HeaderChain()) + + txsCh := make(chan *types.Transaction, len(allTxs)) + for _, tx := range allTxs { + txsCh <- tx + } + + hardKill := new(atomic.Bool) + done := make(chan *core.PrefetchResult, 1) + go func() { + done <- prefetcher.PrefetchStream(header, throwaway, w.vmConfig(), true, + hardKill, nil, txsCh, nil) + }() + + // Immediately set hardKill + close. Workers should see hardKill on the next + // iteration and exit before processing the full buffer. + hardKill.Store(true) + close(txsCh) + + select { + case result := <-done: + // Exit happened. Depending on timing, workers may have processed 0 or a few + // txs before the hardKill check caught up. The key guarantee is that we + // exited promptly and did NOT process all buffered txs blindly. + require.Less(t, len(result.SuccessfulTxs), len(allTxs), + "hardKill should have cut processing short (processed=%d, buffered=%d)", + len(result.SuccessfulTxs), len(allTxs)) + case <-time.After(3 * time.Second): + t.Fatal("PrefetchStream did not exit within 3s after hardKill+close") + } +} + +// TestPrefetchStream_EvmAbortSkipsAndResumes exercises the phase-handoff core contract: +// while evmAbort=true, workers skip txs without processing; after reset, they resume. +func TestPrefetchStream_EvmAbortSkipsAndResumes(t *testing.T) { + t.Parallel() + w, _, header, throwaway, allTxs := streamTestFixture(t) + require.GreaterOrEqual(t, len(allTxs), 8, "need at least 8 txs") + + prefetcher := core.NewStatePrefetcher(w.chainConfig, w.chain.HeaderChain()) + + txsCh := make(chan *types.Transaction, len(allTxs)) + hardKill := new(atomic.Bool) + evmAbort := new(atomic.Bool) + + // Start evmAbort=true so early txs are skipped. + evmAbort.Store(true) + + var processedMu sync.Mutex + var processed []common.Hash + onSuccess := func(h common.Hash, _ uint64) { + processedMu.Lock() + processed = append(processed, h) + processedMu.Unlock() + } + + streamDone := make(chan struct{}) + go func() { + defer close(streamDone) + prefetcher.PrefetchStream(header, throwaway, w.vmConfig(), true, + hardKill, evmAbort, txsCh, onSuccess) + }() + + // Send first 4 txs while evmAbort=true — they should all be skipped. + for i := 0; i < 4; i++ { + txsCh <- allTxs[i] + } + + // Let workers drain the skipped batch. + time.Sleep(30 * time.Millisecond) + + // Reset evmAbort; subsequent sends should be processed. + evmAbort.Store(false) + + // Send next batch — these should reach onSuccess. + for i := 4; i < len(allTxs); i++ { + txsCh <- allTxs[i] + } + + close(txsCh) + <-streamDone + + // Only txs sent AFTER evmAbort=false should appear in processed. + processedMu.Lock() + defer processedMu.Unlock() + + skippedHashes := make(map[common.Hash]struct{}) + for _, tx := range allTxs[:4] { + skippedHashes[tx.Hash()] = struct{}{} + } + for _, h := range processed { + _, wasSkipped := skippedHashes[h] + require.False(t, wasSkipped, "tx %s was sent during evmAbort=true and must not have been processed", h) + } + + t.Logf("evmAbort phase: 4 skipped; resume phase: %d/%d processed", len(processed), len(allTxs)-4) +} + +// TestPrefetchStream_BlockEquivalence confirms the refactored Prefetch(block, ...) +// wrapper produces the same PrefetchResult as feeding the same tx set through +// PrefetchStream directly. Guards against regressions on the blockchain.go path. +func TestPrefetchStream_BlockEquivalence(t *testing.T) { + t.Parallel() + w, _, header, throwaway, allTxs := streamTestFixture(t) + + prefetcher := core.NewStatePrefetcher(w.chainConfig, w.chain.HeaderChain()) + + // Path A: block-oriented Prefetch. + block := types.NewBlock(header, &types.Body{Transactions: allTxs}, nil, trie.NewStackTrie(nil)) + resultA := prefetcher.Prefetch(block, throwaway, w.vmConfig(), true, nil) + require.NotNil(t, resultA) + + // Path B: streaming PrefetchStream over the same txs on a fresh throwaway state. + _, throwawayB, _, _, err := w.chain.StateAtWithReaders(w.chain.CurrentBlock().Root) + require.NoError(t, err) + + ch := make(chan *types.Transaction, len(allTxs)) + for _, tx := range allTxs { + ch <- tx + } + close(ch) + resultB := prefetcher.PrefetchStream(header, throwawayB, w.vmConfig(), true, + nil, nil, ch, nil) + require.NotNil(t, resultB) + + // Successful tx hashes must match modulo ordering (parallel workers don't preserve order). + setA := make(map[common.Hash]struct{}, len(resultA.SuccessfulTxs)) + for _, h := range resultA.SuccessfulTxs { + setA[h] = struct{}{} + } + setB := make(map[common.Hash]struct{}, len(resultB.SuccessfulTxs)) + for _, h := range resultB.SuccessfulTxs { + setB[h] = struct{}{} + } + require.Equal(t, setA, setB, "SuccessfulTxs must be identical between Prefetch and PrefetchStream paths") + require.Equal(t, resultA.TotalGasUsed, resultB.TotalGasUsed, "TotalGasUsed must be identical") + + t.Logf("block-equivalence: %d txs, TotalGasUsed=%d (matched across both paths)", + len(resultA.SuccessfulTxs), resultA.TotalGasUsed) +}