Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ var (
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)

blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)
blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)
blockPrefetchWorkerPanicMeter = metrics.NewRegisteredMeter("chain/prefetch/worker/panic", nil)

// Witness and write-path metrics for block production observability.
// These track the time spent in each phase of writeBlockWithState, which runs
Expand Down
314 changes: 228 additions & 86 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -57,93 +56,236 @@
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to warm the state caches.
//
// This is a thin wrapper over PrefetchStream: it feeds the block's transactions
// into a channel, closes it, and runs the stream to completion. Behavior is
// identical to the pre-stream implementation.
func (p *StatePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, intermediateRootPrefetch bool, interrupt *atomic.Bool) *PrefetchResult {
var (
fails atomic.Int64
totalGasUsed atomic.Uint64
successfulTxs []common.Hash
txsMutex sync.Mutex
header = block.Header()
signer = types.MakeSigner(p.config, header.Number, header.Time)
workers errgroup.Group
reader = statedb.Reader()
)
workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching

// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
stateCpy := statedb.Copy() // closure
workers.Go(func() error {
// If block precaching was interrupted, abort
if interrupt != nil && interrupt.Load() {
return nil
}
// Preload the touched accounts and storage slots in advance
sender, err := types.Sender(signer, tx)
if err != nil {
fails.Add(1)
return nil
}
reader.Account(sender)

if tx.To() != nil {
account, _ := reader.Account(*tx.To())

// Preload the contract code if the destination has non-empty code
if account != nil && !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
reader.Code(*tx.To(), common.BytesToHash(account.CodeHash))
}
}
for _, list := range tx.AccessList() {
reader.Account(list.Address)
if len(list.StorageKeys) > 0 {
for _, slot := range list.StorageKeys {
reader.Storage(list.Address, slot)
}
txs := block.Transactions()
ch := make(chan *types.Transaction, len(txs))
for _, tx := range txs {
ch <- tx
}
close(ch)
return p.PrefetchStream(block.Header(), statedb, cfg, intermediateRootPrefetch, interrupt, nil, ch, nil)
}

// PrefetchStream warms state caches by executing transactions read from txsCh
// in parallel. It spins up a fixed worker pool once and keeps it alive for the
// whole call; workers exit when txsCh is closed or when hardKill is set.
//
// header — block header used for EVM context.
// statedb — parent state snapshot; each worker makes a per-tx Copy.
// cfg — VM config (no tracer recommended).
// intermediateRootPrefetch — if true, compute IntermediateRoot after each tx.
// hardKill — set by the caller to exit the stream permanently;
// workers return at loop entry.
// evmAbort — soft, repeatable interrupt. When set, aborts in-flight
// EVM work and causes workers to skip (not consume)
// subsequent txs until the caller resets it. Lets the
// caller implement phase transitions without tearing
// down the worker pool. May be nil.
// txsCh — transaction source; stream exits when this closes.
// onSuccess — called from worker goroutines on each successful tx.
// Must be safe for concurrent invocation. May be nil.
func (p *StatePrefetcher) PrefetchStream(

Check warning on line 91 in core/state_prefetcher.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 8 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ2sI33EFriE7xnUx-LV&open=AZ2sI33EFriE7xnUx-LV&pullRequest=2192
header *types.Header,
statedb *state.StateDB,
cfg vm.Config,
intermediateRootPrefetch bool,
hardKill *atomic.Bool,
evmAbort *atomic.Bool,
txsCh <-chan *types.Transaction,
onSuccess func(hash common.Hash, gasUsed uint64),
) *PrefetchResult {
// Prefer evmAbort as the EVM interrupt (soft, per-phase); fall back to hardKill.
evmInterrupt := evmAbort
if evmInterrupt == nil {
evmInterrupt = hardKill
}

ctx := &streamCtx{
p: p,
header: header,
statedb: statedb,
reader: statedb.Reader(),
signer: types.MakeSigner(p.config, header.Number, header.Time),
cfg: cfg,
intermediateRootPrefetch: intermediateRootPrefetch,
hardKill: hardKill,
evmAbort: evmAbort,
evmInterrupt: evmInterrupt,
txsCh: txsCh,
onSuccess: onSuccess,
}

workers := max(1, 4*runtime.NumCPU()/5)
var pool sync.WaitGroup
pool.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer pool.Done()
// Isolate worker panics: prefetching is best-effort and operates on a
// throwaway state copy, so a panic here must never take down the node.
// The parent runPrefetcher goroutine has its own recover but Go's
// recover only catches panics in its own goroutine, not children.
defer func() {
if r := recover(); r != nil {
// processTx already incremented txIndex before dispatching
// to prefetchOneTx, so a panicked tx must count as a
// failure to keep valid+invalid meters consistent.
ctx.fails.Add(1)
blockPrefetchWorkerPanicMeter.Mark(1)
log.Error("prefetch worker panicked", "err", r)
}
}
// Execute the message to preload the implicit touched states
evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg)

// Convert the transaction into an executable message and pre-cache its sender
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
fails.Add(1)
return nil // Also invalid block, bail out
}
// Disable the nonce check
msg.SkipNonceChecks = true

stateCpy.SetTxContext(tx.Hash(), i)

// We attempt to apply a transaction. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
evm.SetInterrupt(interrupt)
result, err := ApplyMessage(evm, msg, new(GasPool).AddGas(block.GasLimit()))
if err != nil {
fails.Add(1)
return nil // Ugh, something went horribly wrong, bail out
}

if intermediateRootPrefetch {
stateCpy.IntermediateRoot(true)
}

// Track gas used and successful transaction
totalGasUsed.Add(result.UsedGas)
txsMutex.Lock()
successfulTxs = append(successfulTxs, tx.Hash())
txsMutex.Unlock()
return nil
})
}
workers.Wait()

blockPrefetchTxsValidMeter.Mark(int64(len(block.Transactions())) - fails.Load())
blockPrefetchTxsInvalidMeter.Mark(fails.Load())
}()
ctx.runWorker()
}()
}
Comment thread
claude[bot] marked this conversation as resolved.
pool.Wait()

processed := ctx.txIndex.Load()
blockPrefetchTxsValidMeter.Mark(processed - ctx.fails.Load())
blockPrefetchTxsInvalidMeter.Mark(ctx.fails.Load())

return &PrefetchResult{
TotalGasUsed: totalGasUsed.Load(),
SuccessfulTxs: successfulTxs,
TotalGasUsed: ctx.totalGasUsed.Load(),
SuccessfulTxs: ctx.successfulTxs,
}
}

// streamCtx bundles the per-call state shared by all workers in one PrefetchStream
// invocation. Workers call runWorker to pull from txsCh and warm state caches.
type streamCtx struct {
p *StatePrefetcher
header *types.Header
statedb *state.StateDB
reader state.Reader
signer types.Signer
cfg vm.Config
intermediateRootPrefetch bool
hardKill *atomic.Bool
evmAbort *atomic.Bool
evmInterrupt *atomic.Bool
txsCh <-chan *types.Transaction
onSuccess func(common.Hash, uint64)

fails atomic.Int64
totalGasUsed atomic.Uint64
txIndex atomic.Int64
txsMutex sync.Mutex
successfulTxs []common.Hash
}

// runWorker pulls transactions from the stream and processes each one until the
// channel closes or hardKill fires. While evmAbort is set, buffered transactions
// are consumed but skipped (the pool stays alive for subsequent phases).
func (s *streamCtx) runWorker() {
for tx := range s.txsCh {
if s.hardKill != nil && s.hardKill.Load() {
return
}
if s.evmAbort != nil && s.evmAbort.Load() {
continue
}
s.processTx(tx)
}
}

func (s *streamCtx) processTx(tx *types.Transaction) {
idx := int(s.txIndex.Add(1) - 1)
gasUsed, ok := s.p.prefetchOneTx(
tx, idx, s.header, s.statedb, s.reader, s.signer, s.cfg,
Comment thread
claude[bot] marked this conversation as resolved.
s.intermediateRootPrefetch, s.evmInterrupt, &s.fails,
)
if !ok {
return
}
s.totalGasUsed.Add(gasUsed)
s.txsMutex.Lock()
s.successfulTxs = append(s.successfulTxs, tx.Hash())
s.txsMutex.Unlock()
if s.onSuccess != nil {
s.onSuccess(tx.Hash(), gasUsed)
}
}

// prefetchOneTx executes a single transaction on a copy of statedb to warm caches.
// Shared worker body used by both Prefetch (block-oriented) and PrefetchStream
// (streaming). Returns the execution gas used and a success flag. On any EVM
// interrupt or unrecoverable error it returns (0, false) and increments fails.
func (p *StatePrefetcher) prefetchOneTx(

Check warning on line 217 in core/state_prefetcher.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 10 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ2sI33EFriE7xnUx-LW&open=AZ2sI33EFriE7xnUx-LW&pullRequest=2192
tx *types.Transaction,
txIdx int,
header *types.Header,
statedb *state.StateDB,
reader state.Reader,
signer types.Signer,
cfg vm.Config,
intermediateRootPrefetch bool,
interrupt *atomic.Bool,
fails *atomic.Int64,
) (uint64, bool) {
if interrupt != nil && interrupt.Load() {
// Match every other failure path in this function — the docstring
// promises fails is incremented on every (0,false) return so the
// {valid,invalid} meters add up to the txIndex without double-counting.
fails.Add(1)
return 0, false
}

sender, err := preloadReaderForTx(reader, tx, signer)
if err != nil {
fails.Add(1)
return 0, false
}
_ = sender // sender is pre-cached via reader.Account

stateCpy := statedb.Copy()
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
fails.Add(1)
return 0, false
}
msg.SkipNonceChecks = true // stream order may diverge from nonce order
stateCpy.SetTxContext(tx.Hash(), txIdx)

evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg)
evm.SetInterrupt(interrupt)

result, err := ApplyMessage(evm, msg, new(GasPool).AddGas(header.GasLimit))
if err != nil {
fails.Add(1)
return 0, false
}
if intermediateRootPrefetch {
stateCpy.IntermediateRoot(true)
}
return result.UsedGas, true
}

// preloadReaderForTx issues non-blocking reads against the state reader for the
// accounts, code, and access-list slots the tx is likely to touch. This warms
// the caches before EVM execution. Returns the recovered sender so callers can
// avoid a second signature-recovery pass.
func preloadReaderForTx(reader state.Reader, tx *types.Transaction, signer types.Signer) (common.Address, error) {
sender, err := types.Sender(signer, tx)
if err != nil {
return common.Address{}, err
}
reader.Account(sender)

if to := tx.To(); to != nil {
if account, _ := reader.Account(*to); account != nil &&
!bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
reader.Code(*to, common.BytesToHash(account.CodeHash))
}
}
for _, list := range tx.AccessList() {
reader.Account(list.Address)
for _, slot := range list.StorageKeys {
reader.Storage(list.Address, slot)
}
}
return sender, nil
}
21 changes: 21 additions & 0 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,24 @@ func (t *transactionsByPriceAndNonce) Empty() bool {
func (t *transactionsByPriceAndNonce) Clear() {
t.heads, t.txs = nil, nil
}

// clone returns a shallow copy of the heap suitable for non-destructive scanning.
// LazyTransaction pointers are shared with the original; only the per-account queue
// slices and the heads slice are newly allocated. The heap invariant is preserved in
// the copy because we duplicate the slice in its current heap-ordered state.
func (t *transactionsByPriceAndNonce) clone() *transactionsByPriceAndNonce {
clonedTxs := make(map[common.Address][]*txpool.LazyTransaction, len(t.txs))
for addr, queue := range t.txs {
c := make([]*txpool.LazyTransaction, len(queue))
copy(c, queue)
clonedTxs[addr] = c
}
clonedHeads := make(txByPriceAndTime, len(t.heads))
copy(clonedHeads, t.heads)
return &transactionsByPriceAndNonce{
txs: clonedTxs,
heads: clonedHeads,
signer: t.signer,
baseFee: t.baseFee,
}
}
Loading
Loading