Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,11 @@ var (
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)

blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)
blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil)
blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil)
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)
blockPrefetchWorkerPanicMeter = metrics.NewRegisteredMeter("chain/prefetch/worker/panic", nil)

// Witness and write-path metrics for block production observability.
// These track the time spent in each phase of writeBlockWithState, which runs
Expand Down
314 changes: 228 additions & 86 deletions core/state_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

Expand Down Expand Up @@ -57,93 +56,236 @@
// Prefetch processes the state changes according to the Ethereum rules by running
// the transaction messages using the statedb, but any changes are discarded. The
// only goal is to warm the state caches.
//
// This is a thin wrapper over PrefetchStream: it feeds the block's transactions
// into a channel, closes it, and runs the stream to completion. Behavior is
// identical to the pre-stream implementation.
func (p *StatePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, cfg vm.Config, intermediateRootPrefetch bool, interrupt *atomic.Bool) *PrefetchResult {
var (
fails atomic.Int64
totalGasUsed atomic.Uint64
successfulTxs []common.Hash
txsMutex sync.Mutex
header = block.Header()
signer = types.MakeSigner(p.config, header.Number, header.Time)
workers errgroup.Group
reader = statedb.Reader()
)
workers.SetLimit(max(1, 4*runtime.NumCPU()/5)) // Aggressively run the prefetching

// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
stateCpy := statedb.Copy() // closure
workers.Go(func() error {
// If block precaching was interrupted, abort
if interrupt != nil && interrupt.Load() {
return nil
}
// Preload the touched accounts and storage slots in advance
sender, err := types.Sender(signer, tx)
if err != nil {
fails.Add(1)
return nil
}
reader.Account(sender)

if tx.To() != nil {
account, _ := reader.Account(*tx.To())

// Preload the contract code if the destination has non-empty code
if account != nil && !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
reader.Code(*tx.To(), common.BytesToHash(account.CodeHash))
}
}
for _, list := range tx.AccessList() {
reader.Account(list.Address)
if len(list.StorageKeys) > 0 {
for _, slot := range list.StorageKeys {
reader.Storage(list.Address, slot)
}
txs := block.Transactions()
ch := make(chan *types.Transaction, len(txs))
for _, tx := range txs {
ch <- tx
}
close(ch)
return p.PrefetchStream(block.Header(), statedb, cfg, intermediateRootPrefetch, interrupt, nil, ch, nil)
}

// PrefetchStream warms state caches by executing transactions read from txsCh
// in parallel. It spins up a fixed worker pool once and keeps it alive for the
// whole call; workers exit when txsCh is closed or when hardKill is set.
//
// header — block header used for EVM context.
// statedb — parent state snapshot; each worker makes a per-tx Copy.
// cfg — VM config (no tracer recommended).
// intermediateRootPrefetch — if true, compute IntermediateRoot after each tx.
// hardKill — set by the caller to exit the stream permanently;
// workers return at loop entry.
// evmAbort — soft, repeatable interrupt. When set, aborts in-flight
// EVM work and causes workers to skip (not consume)
// subsequent txs until the caller resets it. Lets the
// caller implement phase transitions without tearing
// down the worker pool. May be nil.
// txsCh — transaction source; stream exits when this closes.
// onSuccess — called from worker goroutines on each successful tx.
// Must be safe for concurrent invocation. May be nil.
func (p *StatePrefetcher) PrefetchStream(

Check warning on line 91 in core/state_prefetcher.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 8 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ2sI33EFriE7xnUx-LV&open=AZ2sI33EFriE7xnUx-LV&pullRequest=2192
header *types.Header,
statedb *state.StateDB,
cfg vm.Config,
intermediateRootPrefetch bool,
hardKill *atomic.Bool,
evmAbort *atomic.Bool,
txsCh <-chan *types.Transaction,
onSuccess func(hash common.Hash, gasUsed uint64),
) *PrefetchResult {
// Prefer evmAbort as the EVM interrupt (soft, per-phase); fall back to hardKill.
evmInterrupt := evmAbort
if evmInterrupt == nil {
evmInterrupt = hardKill
}

ctx := &streamCtx{
p: p,
header: header,
statedb: statedb,
reader: statedb.Reader(),
signer: types.MakeSigner(p.config, header.Number, header.Time),
cfg: cfg,
intermediateRootPrefetch: intermediateRootPrefetch,
hardKill: hardKill,
evmAbort: evmAbort,
evmInterrupt: evmInterrupt,
txsCh: txsCh,
onSuccess: onSuccess,
}

workers := max(1, 4*runtime.NumCPU()/5)
var pool sync.WaitGroup
pool.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer pool.Done()
// Isolate worker panics: prefetching is best-effort and operates on a
// throwaway state copy, so a panic here must never take down the node.
// The parent runPrefetcher goroutine has its own recover but Go's
// recover only catches panics in its own goroutine, not children.
defer func() {
if r := recover(); r != nil {
// processTx already incremented txIndex before dispatching
// to prefetchOneTx, so a panicked tx must count as a
// failure to keep valid+invalid meters consistent.
ctx.fails.Add(1)
blockPrefetchWorkerPanicMeter.Mark(1)
log.Error("prefetch worker panicked", "err", r)
}
}
// Execute the message to preload the implicit touched states
evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg)

// Convert the transaction into an executable message and pre-cache its sender
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
fails.Add(1)
return nil // Also invalid block, bail out
}
// Disable the nonce check
msg.SkipNonceChecks = true

stateCpy.SetTxContext(tx.Hash(), i)

// We attempt to apply a transaction. The goal is not to execute
// the transaction successfully, rather to warm up touched data slots.
evm.SetInterrupt(interrupt)
result, err := ApplyMessage(evm, msg, new(GasPool).AddGas(block.GasLimit()))
if err != nil {
fails.Add(1)
return nil // Ugh, something went horribly wrong, bail out
}

if intermediateRootPrefetch {
stateCpy.IntermediateRoot(true)
}

// Track gas used and successful transaction
totalGasUsed.Add(result.UsedGas)
txsMutex.Lock()
successfulTxs = append(successfulTxs, tx.Hash())
txsMutex.Unlock()
return nil
})
}
workers.Wait()

blockPrefetchTxsValidMeter.Mark(int64(len(block.Transactions())) - fails.Load())
blockPrefetchTxsInvalidMeter.Mark(fails.Load())
}()
ctx.runWorker()
}()
}
Comment thread
claude[bot] marked this conversation as resolved.
pool.Wait()

processed := ctx.txIndex.Load()
blockPrefetchTxsValidMeter.Mark(processed - ctx.fails.Load())
blockPrefetchTxsInvalidMeter.Mark(ctx.fails.Load())

return &PrefetchResult{
TotalGasUsed: totalGasUsed.Load(),
SuccessfulTxs: successfulTxs,
TotalGasUsed: ctx.totalGasUsed.Load(),
SuccessfulTxs: ctx.successfulTxs,
}
}

// streamCtx bundles the per-call state shared by all workers in one PrefetchStream
// invocation. Workers call runWorker to pull from txsCh and warm state caches.
type streamCtx struct {
p *StatePrefetcher
header *types.Header
statedb *state.StateDB
reader state.Reader
signer types.Signer
cfg vm.Config
intermediateRootPrefetch bool
hardKill *atomic.Bool
evmAbort *atomic.Bool
evmInterrupt *atomic.Bool
txsCh <-chan *types.Transaction
onSuccess func(common.Hash, uint64)

fails atomic.Int64
totalGasUsed atomic.Uint64
txIndex atomic.Int64
txsMutex sync.Mutex
successfulTxs []common.Hash
}

// runWorker pulls transactions from the stream and processes each one until the
// channel closes or hardKill fires. While evmAbort is set, buffered transactions
// are consumed but skipped (the pool stays alive for subsequent phases).
func (s *streamCtx) runWorker() {
for tx := range s.txsCh {
if s.hardKill != nil && s.hardKill.Load() {
return
}
if s.evmAbort != nil && s.evmAbort.Load() {
continue
}
s.processTx(tx)
}
}

func (s *streamCtx) processTx(tx *types.Transaction) {
idx := int(s.txIndex.Add(1) - 1)
gasUsed, ok := s.p.prefetchOneTx(
tx, idx, s.header, s.statedb, s.reader, s.signer, s.cfg,
Comment thread
claude[bot] marked this conversation as resolved.
s.intermediateRootPrefetch, s.evmInterrupt, &s.fails,
)
if !ok {
return
}
s.totalGasUsed.Add(gasUsed)
s.txsMutex.Lock()
s.successfulTxs = append(s.successfulTxs, tx.Hash())
s.txsMutex.Unlock()
if s.onSuccess != nil {
s.onSuccess(tx.Hash(), gasUsed)
}
}

// prefetchOneTx executes a single transaction on a copy of statedb to warm caches.
// Shared worker body used by both Prefetch (block-oriented) and PrefetchStream
// (streaming). Returns the execution gas used and a success flag. On any EVM
// interrupt or unrecoverable error it returns (0, false) and increments fails.
func (p *StatePrefetcher) prefetchOneTx(

Check warning on line 217 in core/state_prefetcher.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This function has 10 parameters, which is greater than the 7 authorized.

See more on https://sonarcloud.io/project/issues?id=0xPolygon_bor&issues=AZ2sI33EFriE7xnUx-LW&open=AZ2sI33EFriE7xnUx-LW&pullRequest=2192
tx *types.Transaction,
txIdx int,
header *types.Header,
statedb *state.StateDB,
reader state.Reader,
signer types.Signer,
cfg vm.Config,
intermediateRootPrefetch bool,
interrupt *atomic.Bool,
fails *atomic.Int64,
) (uint64, bool) {
if interrupt != nil && interrupt.Load() {
// Match every other failure path in this function — the docstring
// promises fails is incremented on every (0,false) return so the
// {valid,invalid} meters add up to the txIndex without double-counting.
fails.Add(1)
return 0, false
}

sender, err := preloadReaderForTx(reader, tx, signer)
if err != nil {
fails.Add(1)
return 0, false
}
_ = sender // sender is pre-cached via reader.Account

stateCpy := statedb.Copy()
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
fails.Add(1)
return 0, false
}
msg.SkipNonceChecks = true // stream order may diverge from nonce order
stateCpy.SetTxContext(tx.Hash(), txIdx)

evm := vm.NewEVM(NewEVMBlockContext(header, p.chain, nil), stateCpy, p.config, cfg)
evm.SetInterrupt(interrupt)

result, err := ApplyMessage(evm, msg, new(GasPool).AddGas(header.GasLimit))
if err != nil {
fails.Add(1)
return 0, false
}
if intermediateRootPrefetch {
stateCpy.IntermediateRoot(true)
}
return result.UsedGas, true
}

// preloadReaderForTx issues non-blocking reads against the state reader for the
// accounts, code, and access-list slots the tx is likely to touch. This warms
// the caches before EVM execution. Returns the recovered sender so callers can
// avoid a second signature-recovery pass.
func preloadReaderForTx(reader state.Reader, tx *types.Transaction, signer types.Signer) (common.Address, error) {
sender, err := types.Sender(signer, tx)
if err != nil {
return common.Address{}, err
}
reader.Account(sender)

if to := tx.To(); to != nil {
if account, _ := reader.Account(*to); account != nil &&
!bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
reader.Code(*to, common.BytesToHash(account.CodeHash))
}
}
for _, list := range tx.AccessList() {
reader.Account(list.Address)
for _, slot := range list.StorageKeys {
reader.Storage(list.Address, slot)
}
}
return sender, nil
}
21 changes: 21 additions & 0 deletions miner/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,24 @@ func (t *transactionsByPriceAndNonce) Empty() bool {
func (t *transactionsByPriceAndNonce) Clear() {
t.heads, t.txs = nil, nil
}

// clone returns a shallow copy of the heap suitable for non-destructive scanning.
// LazyTransaction pointers are shared with the original; only the per-account queue
// slices and the heads slice are newly allocated. The heap invariant is preserved in
// the copy because we duplicate the slice in its current heap-ordered state.
func (t *transactionsByPriceAndNonce) clone() *transactionsByPriceAndNonce {
clonedTxs := make(map[common.Address][]*txpool.LazyTransaction, len(t.txs))
for addr, queue := range t.txs {
c := make([]*txpool.LazyTransaction, len(queue))
copy(c, queue)
clonedTxs[addr] = c
}
clonedHeads := make(txByPriceAndTime, len(t.heads))
copy(clonedHeads, t.heads)
return &transactionsByPriceAndNonce{
txs: clonedTxs,
heads: clonedHeads,
signer: t.signer,
baseFee: t.baseFee,
}
}
Loading
Loading