diff --git a/Cargo.lock b/Cargo.lock index f8927525c7e..3f0e62284ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4338,6 +4338,7 @@ dependencies = [ "fuel-core", "fuel-core-chain-config", "fuel-core-database", + "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", "fuel-core-sync", @@ -4713,6 +4714,7 @@ dependencies = [ name = "fuel-core-metrics" version = "0.48.0" dependencies = [ + "fuel-core-types 0.48.0", "once_cell", "parking_lot", "pin-project-lite", @@ -4762,12 +4764,12 @@ name = "fuel-core-parallel-executor" version = "0.48.0" dependencies = [ "anyhow", - "derive_more 0.99.20", + "derive_more 2.1.1", "fuel-core-executor", + "fuel-core-metrics", "fuel-core-storage", "fuel-core-trace", "fuel-core-types 0.48.0", - "fuel-core-upgradable-executor", "futures", "fxhash", "parking_lot", diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 3ff67854974..7745ee0e3f3 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -13,6 +13,7 @@ default = [ "fuel-core/u32-tx-count", "fuel-core-chain-config/u32-tx-count", "fuel-core-types/u32-tx-pointer", + "parallel-executor" ] fault-proving = [ "fuel-core-types/fault-proving", @@ -41,6 +42,7 @@ ethnum = "1.3" fuel-core = { path = "../crates/fuel-core", default-features = false, features = ["rocksdb-production"] } fuel-core-chain-config = { workspace = true } fuel-core-database = { path = "./../crates/database" } +fuel-core-metrics = { workspace = true } fuel-core-services = { path = "./../crates/services" } fuel-core-storage = { path = "./../crates/storage" } fuel-core-sync = { path = "./../crates/services/sync", features = ["benchmarking"] } diff --git a/benches/src/bin/tps_bench.rs b/benches/src/bin/tps_bench.rs index d0ce9c22eab..4d0864b7fc9 100644 --- a/benches/src/bin/tps_bench.rs +++ b/benches/src/bin/tps_bench.rs @@ -1,11 +1,15 @@ // Define arguments -use fuel_core::service::config::Trigger; +use fuel_core::service::config::{ + ExecutorMode, + Trigger, +}; use fuel_core_chain_config::{ ChainConfig, CoinConfig, SnapshotMetadata, }; +use fuel_core_metrics::encode_metrics; use fuel_core_storage::transactional::AtomicView; use fuel_core_types::{ blockchain::transaction::TransactionExt, @@ -54,7 +58,7 @@ use test_helpers::builder::{ TestSetupBuilder, }; -const PATH_SNAPSHOT: &str = "/Users/green/fuel/fuel-core-2/benches/local-testnet"; +const PATH_SNAPSHOT: &str = "./local-testnet"; fn checked_parameters() -> CheckPredicateParams { let metadata = SnapshotMetadata::read(PATH_SNAPSHOT).unwrap(); @@ -70,6 +74,16 @@ struct Args { pub number_of_cores: usize, #[clap(short = 't', long, default_value = "150000")] pub number_of_transactions: u64, + #[clap(long, default_value = "0")] + pub txpool_verification_threads: usize, + #[clap(long, default_value = "0")] + pub txpool_verification_queue_size: usize, + #[clap(long, default_value = "0")] + pub txpool_p2p_sync_threads: usize, + #[clap(long, default_value = "0")] + pub txpool_p2p_sync_queue_size: usize, + #[clap(long, default_value = "0")] + pub executor_parallel_worker_count: usize, } fn generate_transactions(nb_txs: u64, rng: &mut StdRng) -> Vec { @@ -135,6 +149,9 @@ fn generate_transactions(nb_txs: u64, rng: &mut StdRng) -> Vec { } fn main() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::WARN) + .try_init(); let args = Args::parse(); let mut rng = StdRng::seed_from_u64(2322u64); @@ -200,9 +217,29 @@ fn main() { Some(tx.max_gas(&chain_conf.consensus_parameters).unwrap()) }) .sum(); - test_builder.gas_limit = Some(gas_limit * 4); + test_builder.gas_limit = Some(gas_limit); test_builder.block_size_limit = Some(u64::MAX); test_builder.number_threads_pool_verif = args.number_of_cores; + if args.txpool_verification_threads != 0 { + test_builder.txpool_verification_threads = args.txpool_verification_threads; + } + if args.txpool_verification_queue_size != 0 { + test_builder.txpool_verification_queue_size = args.txpool_verification_queue_size; + } + if args.txpool_p2p_sync_threads != 0 { + test_builder.txpool_p2p_sync_threads = args.txpool_p2p_sync_threads; + } + if args.txpool_p2p_sync_queue_size != 0 { + test_builder.txpool_p2p_sync_queue_size = args.txpool_p2p_sync_queue_size; + } + if args.executor_parallel_worker_count != 0 { + test_builder.executor_parallel_worker_count = args.executor_parallel_worker_count; + } + #[cfg(feature = "parallel-executor")] + { + test_builder.executor_mode = ExecutorMode::Parallel; + test_builder.executor_metrics = true; + } test_builder.max_txs = transactions.len(); // spin up node let rt = tokio::runtime::Builder::new_multi_thread() @@ -241,10 +278,19 @@ fn main() { .unwrap() .unwrap(); assert_eq!(block.entity.transactions().len(), transactions.len() + 1); + println!("transaction count: {}", block.entity.transactions().len()); block } }); + if let Ok(metrics) = encode_metrics() { + for line in metrics.lines().filter(|line| { + line.starts_with("parallel_executor_") && !line.starts_with('#') + }) { + println!("metrics: {line}"); + } + } + // rt.block_on(async move { // test_builder.set_chain_config(chain_conf.clone()); // let TestContext { srv, .. } = test_builder.finalize().await; diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 0d5cbbafa28..049faa569af 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -32,6 +32,7 @@ use fuel_core::{ RelayerConsensusConfig, config::{ DaCompressionMode, + ExecutorConfig, Trigger, }, genesis::NotifyCancel, @@ -65,10 +66,7 @@ use fuel_core_metrics::config::{ DisableConfig, Module, }; -use fuel_core_types::{ - blockchain::header::StateTransitionBytecodeVersion, - signer::SignMode, -}; +use fuel_core_types::signer::SignMode; use pyroscope::{ PyroscopeAgent, pyroscope::PyroscopeAgentRunning, @@ -104,9 +102,6 @@ use fuel_core::state::historical_rocksdb::StateRewindPolicy; use crate::cli::run::gas_price::GasPriceArgs; use fuel_core::service::config::GasPriceConfig; -#[cfg(feature = "parallel-executor")] -use std::num::NonZeroUsize; - #[cfg(feature = "p2p")] mod p2p; @@ -117,6 +112,7 @@ mod rpc; mod shared_sequencer; mod consensus; +mod executor; mod gas_price; mod graphql; #[cfg(feature = "p2p")] @@ -127,6 +123,10 @@ mod relayer; mod tx_pool; mod tx_status_manager; +use crate::cli::run::executor::ExecutorArgs; +#[cfg(feature = "parallel-executor")] +use fuel_core::service::config::ParallelExecutorConfig; + /// Run the Fuel client node locally. #[derive(Debug, Clone, Parser)] pub struct Command { @@ -229,13 +229,8 @@ pub struct Command { pub utxo_validation: bool, /// Overrides the version of the native executor. - #[arg(long = "native-executor-version", env)] - pub native_executor_version: Option, - - /// Number of cores to use for the parallel executor. - #[cfg(feature = "parallel-executor")] - #[arg(long = "executor-number-of-cores", env, default_value = "1")] - pub executor_number_of_cores: NonZeroUsize, + #[clap(flatten)] + pub executor: ExecutorArgs, /// All the configurations for the gas price service. #[clap(flatten)] @@ -363,9 +358,7 @@ impl Command { allow_syscall, expensive_subscriptions, utxo_validation, - native_executor_version, - #[cfg(feature = "parallel-executor")] - executor_number_of_cores, + executor, gas_price, consensus_key, #[cfg(feature = "aws-kms")] @@ -418,6 +411,17 @@ impl Command { anyhow::bail!("`--allow-syscall` is only allowed in debug mode"); } + let ExecutorArgs { + executor_mode, + native_executor_version, + #[cfg(feature = "parallel-executor")] + executor_number_of_cores, + #[cfg(feature = "parallel-executor")] + executor_metrics, + #[cfg(feature = "parallel-executor")] + executor_worker_count_policy, + } = executor; + let enabled_metrics = metrics.list_of_enabled(); if !enabled_metrics.is_empty() { @@ -740,12 +744,19 @@ impl Command { debug, historical_execution, expensive_subscriptions, - native_executor_version, + executor: ExecutorConfig { + mode: executor_mode, + native_executor_version, + #[cfg(feature = "parallel-executor")] + parallel: ParallelExecutorConfig { + worker_count: executor_number_of_cores, + worker_count_policy: executor_worker_count_policy, + metrics: executor_metrics, + }, + }, continue_on_error, allow_syscall, utxo_validation, - #[cfg(feature = "parallel-executor")] - executor_number_of_cores, block_production: trigger, leader_lock, predefined_blocks_path, @@ -762,6 +773,7 @@ impl Command { pending_pool_tx_ttl: tx_pending_pool_ttl.into(), max_pending_pool_size_percentage: tx_pending_pool_size_percentage, metrics: metrics.is_enabled(Module::TxPool), + eagerly_include_tx_dependency_graphs: false, }, block_producer: ProducerConfig { coinbase_recipient, diff --git a/bin/fuel-core/src/cli/run/executor.rs b/bin/fuel-core/src/cli/run/executor.rs new file mode 100644 index 00000000000..08bc36cd811 --- /dev/null +++ b/bin/fuel-core/src/cli/run/executor.rs @@ -0,0 +1,44 @@ +use clap::Args; +use fuel_core::service::config::ExecutorMode; +#[cfg(feature = "parallel-executor")] +use fuel_core::service::config::ParallelExecutorWorkerCountPolicy; +use fuel_core_types::blockchain::header::StateTransitionBytecodeVersion; + +#[cfg(feature = "parallel-executor")] +use std::num::NonZeroUsize; + +#[derive(Debug, Clone, Args)] +pub struct ExecutorArgs { + /// Overrides the version of the native executor. + #[arg(long = "native-executor-version", env)] + pub native_executor_version: Option, + + /// Executor mode to use. + #[arg(long = "executor-mode", value_enum, default_value = "normal", env)] + pub executor_mode: ExecutorMode, + + /// Number of cores to use for the parallel executor. + #[cfg(feature = "parallel-executor")] + #[arg( + long = "executor-number-of-cores", + env, + default_value = "1", + alias = "executor-worker-count" + )] + pub executor_number_of_cores: NonZeroUsize, + + /// Enable metrics for the parallel executor. + #[cfg(feature = "parallel-executor")] + #[arg(long = "executor-metrics", env)] + pub executor_metrics: bool, + + /// Worker count policy for tx selection in parallel scheduler. + #[cfg(feature = "parallel-executor")] + #[arg( + long = "executor-worker-count-policy", + value_enum, + default_value = "static-max", + env + )] + pub executor_worker_count_policy: ParallelExecutorWorkerCountPolicy, +} diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 4756061d78e..06f9e34ee83 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -133,7 +133,7 @@ uuid = { version = "1.1", features = ["v4"] } [dev-dependencies] assert_matches = "1.5" fuel-core = { path = ".", features = ["test-helpers"] } -fuel-core-executor = { workspace = true, features = ["std", "test-helpers", "limited-tx-count"] } +fuel-core-executor = { workspace = true, features = ["std", "test-helpers", "u32-tx-count"] } fuel-core-services = { path = "./../services", features = ["test-helpers"] } fuel-core-storage = { path = "./../storage", features = ["test-helpers"] } fuel-core-trace = { path = "./../trace" } diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index e21eff98f90..338c8e05729 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -162,6 +162,7 @@ where D: ports::worker::OffChainDatabase, { fn process_block(&mut self, result: SharedImportResult) -> anyhow::Result<()> { + let instant = tokio::time::Instant::now(); let block = &result.sealed_block.entity; let mut transaction = self.database.transaction(); // save the status for every transaction using the finalized block id @@ -170,12 +171,21 @@ where self.asset_metadata_indexation_enabled, &mut transaction, )?; + tracing::warn!( + "persist_transaction_status elapsed time: {:?}", + instant.elapsed() + ); // save the associated owner for each transaction in the block index_tx_owners_for_block(block, &mut transaction, &self.chain_id)?; + tracing::warn!( + "index_tx_owners_for_block elapsed time: {:?}", + instant.elapsed() + ); // save the transaction related information process_transactions(block.transactions().iter(), &mut transaction)?; + tracing::warn!("process_transactions elapsed time: {:?}", instant.elapsed()); let height = block.header().height(); let block_id = block.id(); @@ -186,6 +196,7 @@ where let total_tx_count = transaction .increase_tx_count(block.transactions().len() as u64) .unwrap_or_default(); + tracing::warn!("increase_tx_count elapsed time: {:?}", instant.elapsed()); process_executor_events( result.events.iter().map(Cow::Borrowed), @@ -194,8 +205,13 @@ where self.coins_to_spend_indexation_enabled, &self.base_asset_id, )?; + tracing::warn!( + "process_executor_events elapsed time: {:?}", + instant.elapsed() + ); transaction.commit()?; + tracing::warn!("transaction.commit elapsed time: {:?}", instant.elapsed()); for status in result.tx_status.iter() { let tx_id = status.id; @@ -205,6 +221,11 @@ where .send_complete(tx_id, height, status.into()); } + tracing::warn!( + "process_block elapsed time: {:?} for block: {:?}", + instant.elapsed(), + height + ); // Notify subscribers and update last seen block height self.shared_state .block_height_subscription_handler diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index ef113f61216..2a66a3e3076 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -40,6 +40,7 @@ use async_graphql::{ EmptyFields, }, }; +use fuel_core_metrics::parallel_executor_metrics::record_block_production_time; use fuel_core_storage::{ Result as StorageResult, iter::IterDirection, @@ -400,6 +401,7 @@ impl BlockMutation { start_timestamp: Option, blocks_to_produce: U32, ) -> async_graphql::Result { + let instant = tokio::time::Instant::now(); let config = ctx.data_unchecked::().clone(); if !config.debug { @@ -417,11 +419,21 @@ impl BlockMutation { let on_chain_height = ctx.read_view()?.latest_block_height()?; let shared_state = ctx.data_unchecked::(); + tracing::warn!( + "elapsed before wait_for_block_height: {:?}", + instant.elapsed() + ); shared_state .block_height_subscription_handler .subscribe() .wait_for_block_height(on_chain_height) .await?; + tracing::warn!( + "Produced {} blocks in {:?}", + blocks_to_produce, + instant.elapsed() + ); + record_block_production_time(instant.elapsed()); Ok(on_chain_height.into()) } } diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index b1e3df8ff1c..645a9f6ccfb 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -22,6 +22,7 @@ use adapters::{ pub use config::{ Config, DbType, + ExecutorMode, RelayerConsensusConfig, }; use fuel_core_chain_config::{ diff --git a/crates/fuel-core/src/service/adapters.rs b/crates/fuel-core/src/service/adapters.rs index ded24e76d42..3758020238e 100644 --- a/crates/fuel-core/src/service/adapters.rs +++ b/crates/fuel-core/src/service/adapters.rs @@ -413,13 +413,21 @@ impl ExecutorAdapter { } } +#[cfg(feature = "parallel-executor")] +#[derive(Clone)] +pub(crate) enum ParallelExecutorAdapterInner { + Parallel { + executor: Arc< + Mutex, PreconfirmationSender>>, + >, + }, + Native(ExecutorAdapter), +} + #[cfg(feature = "parallel-executor")] #[derive(Clone)] pub struct ParallelExecutorAdapter { - pub executor: - Arc, PreconfirmationSender>>>, - pub new_txs_watcher: watch::Receiver<()>, - pub preconfirmation_sender: PreconfirmationSender, + pub(crate) inner: ParallelExecutorAdapterInner, } #[cfg(feature = "parallel-executor")] @@ -428,7 +436,6 @@ impl ParallelExecutorAdapter { database: Database, relayer_database: Database, config: fuel_core_parallel_executor::config::Config, - new_txs_watcher: watch::Receiver<()>, preconfirmation_sender: PreconfirmationSender, ) -> anyhow::Result { let executor = ParallelExecutor::new( @@ -438,11 +445,17 @@ impl ParallelExecutorAdapter { config, )?; Ok(Self { - executor: Arc::new(Mutex::new(executor)), - new_txs_watcher, - preconfirmation_sender, + inner: ParallelExecutorAdapterInner::Parallel { + executor: Arc::new(Mutex::new(executor)), + }, }) } + + pub fn from_native(executor: ExecutorAdapter) -> Self { + Self { + inner: ParallelExecutorAdapterInner::Native(executor), + } + } } #[derive(Clone)] diff --git a/crates/fuel-core/src/service/adapters/block_importer.rs b/crates/fuel-core/src/service/adapters/block_importer.rs index b4b7a8597b5..b751872efbd 100644 --- a/crates/fuel-core/src/service/adapters/block_importer.rs +++ b/crates/fuel-core/src/service/adapters/block_importer.rs @@ -1,5 +1,8 @@ #[cfg(feature = "parallel-executor")] -use crate::service::adapters::ParallelExecutorAdapter; +use crate::service::adapters::{ + ParallelExecutorAdapter, + ParallelExecutorAdapterInner, +}; use crate::{ database::{ Database, @@ -95,10 +98,20 @@ impl BlockImporterAdapter { #[cfg(not(feature = "no-parallel-executor"))] executor: ParallelExecutorAdapter, #[cfg(feature = "no-parallel-executor")] executor: ExecutorAdapter, verifier: VerifierAdapter, + block_reconciliation_write_adapter: BlockReconciliationWriteAdapter, ) -> Self { - let importer = Importer::new(chain_id, config, database, executor, verifier); + let database_for_height = database.clone(); + let importer = Importer::new( + chain_id, + config, + database, + executor, + verifier, + block_reconciliation_write_adapter, + ); Self { block_importer: Arc::new(importer), + database: database_for_height, } } @@ -175,9 +188,16 @@ impl Validator for ExecutorAdapter { impl Validator for ParallelExecutorAdapter { fn validate( &self, - _block: &Block, + block: &Block, ) -> ExecutorResult> { - todo!("Implement me please") + match &self.inner { + ParallelExecutorAdapterInner::Parallel { .. } => { + todo!("Implement me please") + } + ParallelExecutorAdapterInner::Native(native) => { + native.executor.validate(block) + } + } } } @@ -203,9 +223,22 @@ impl WasmChecker for ExecutorAdapter { impl WasmChecker for ParallelExecutorAdapter { fn validate_uploaded_wasm( &self, - _wasm_root: &Bytes32, + wasm_root: &Bytes32, ) -> Result<(), WasmValidityError> { - unimplemented!("no validation yet") + match &self.inner { + ParallelExecutorAdapterInner::Parallel { .. } => { + unimplemented!("no validation yet") + } + ParallelExecutorAdapterInner::Native(native) => native + .executor + .validate_uploaded_wasm(wasm_root) + .map_err(|err| match err { + fuel_core_upgradable_executor::error::UpgradableError::InvalidWasm(_) => { + WasmValidityError::NotValid + } + _ => WasmValidityError::NotFound, + }), + } } } diff --git a/crates/fuel-core/src/service/adapters/executor.rs b/crates/fuel-core/src/service/adapters/executor.rs index 03c467046e4..21d18be7184 100644 --- a/crates/fuel-core/src/service/adapters/executor.rs +++ b/crates/fuel-core/src/service/adapters/executor.rs @@ -57,6 +57,7 @@ impl fuel_core_executor::ports::TransactionsSource for TransactionsSource { maximum_txs: transactions_limit, maximum_block_size: block_transaction_size_limit, excluded_contracts: HashSet::default(), + execution_worker_count: 1, }) .unwrap_or_default() .into_iter() @@ -76,9 +77,10 @@ impl fuel_core_parallel_executor::ports::TransactionsSource for TransactionsSour gas_limit: u64, tx_count_limit: u32, block_transaction_size_limit: u64, + selection_worker_count: usize, filter: Filter, ) -> anyhow::Result { - let (transactions, excluded_contract_ids) = self + let (transactions, excluded_contract_ids, anchor_contract_ids) = self .tx_pool .extract_transactions_for_block_async(Constraints { minimal_gas_price: self.minimum_gas_price, @@ -86,6 +88,7 @@ impl fuel_core_parallel_executor::ports::TransactionsSource for TransactionsSour maximum_txs: tx_count_limit, maximum_block_size: block_transaction_size_limit, excluded_contracts: filter.excluded_contract_ids, + execution_worker_count: selection_worker_count, }) .await .map_err(|e| anyhow::anyhow!("{e}"))?; @@ -98,6 +101,7 @@ impl fuel_core_parallel_executor::ports::TransactionsSource for TransactionsSour .collect(); Ok(TransactionSourceExecutableTransactions { transactions, + anchor_contract_ids, filtered: TransactionFiltered::Filtered, filter: Filter { excluded_contract_ids, diff --git a/crates/fuel-core/src/service/adapters/producer.rs b/crates/fuel-core/src/service/adapters/producer.rs index 5b62a595b0f..a27d8977d8a 100644 --- a/crates/fuel-core/src/service/adapters/producer.rs +++ b/crates/fuel-core/src/service/adapters/producer.rs @@ -1,5 +1,8 @@ #[cfg(feature = "parallel-executor")] -use crate::service::adapters::ParallelExecutorAdapter; +use crate::service::adapters::{ + ParallelExecutorAdapter, + ParallelExecutorAdapterInner, +}; use crate::{ database::OnChainIterableKeyValueView, service::{ @@ -134,12 +137,30 @@ impl fuel_core_producer::ports::BlockProducer component: Components, deadline: Instant, ) -> ExecutorResult> { - self.executor - .lock() - .await - .produce_without_commit_with_source(component, deadline) - .await - .map_err(|e| ExecutorError::Other(format!("{:?}", e))) + match &self.inner { + ParallelExecutorAdapterInner::Parallel { executor, .. } => executor + .lock() + .await + .produce_without_commit_with_source(component, deadline) + .await + .map_err(|e| ExecutorError::Other(format!("{:?}", e))), + ParallelExecutorAdapterInner::Native(native) => { + let new_tx_waiter = + NewTxWaiter::new(native.new_txs_watcher.clone(), deadline); + native + .executor + .produce_without_commit_with_source( + component, + new_tx_waiter, + native.preconfirmation_sender.clone(), + ) + .await + .map(|u| { + let (result, changes) = u.into(); + Uncommitted::new(result, StorageChanges::Changes(changes)) + }) + } + } } } @@ -164,10 +185,22 @@ impl fuel_core_producer::ports::BlockProducer> type Deadline = (); async fn produce_without_commit( &self, - _component: Components>, + component: Components>, _: (), ) -> ExecutorResult> { - unimplemented!("ParallelExecutorAdapter does not support produce_without_commit"); + match &self.inner { + ParallelExecutorAdapterInner::Parallel { .. } => { + unimplemented!( + "ParallelExecutorAdapter does not support produce_without_commit" + ); + } + ParallelExecutorAdapterInner::Native(native) => native + .produce_without_commit_from_vector(component) + .map(|u| { + let (result, changes) = u.into(); + Uncommitted::new(result, StorageChanges::Changes(changes)) + }), + } } } @@ -192,12 +225,23 @@ impl fuel_core_producer::ports::DryRunner for ExecutorAdapter { impl fuel_core_producer::ports::DryRunner for ParallelExecutorAdapter { fn dry_run( &self, - _block: Components>, - _forbid_fake_coins: Option, - _at_height: Option, - _record_storage_read_replay: bool, + block: Components>, + forbid_fake_coins: Option, + at_height: Option, + record_storage_read_replay: bool, ) -> ExecutorResult { - unimplemented!("ParallelExecutorAdapter does not support dry run"); + match &self.inner { + ParallelExecutorAdapterInner::Parallel { .. } => { + unimplemented!("ParallelExecutorAdapter does not support dry run"); + } + ParallelExecutorAdapterInner::Native(native) => native.executor.dry_run( + block, + forbid_fake_coins, + forbid_fake_coins, + at_height, + record_storage_read_replay, + ), + } } } @@ -214,9 +258,18 @@ impl fuel_core_producer::ports::StorageReadReplayRecorder for ExecutorAdapter { impl fuel_core_producer::ports::StorageReadReplayRecorder for ParallelExecutorAdapter { fn storage_read_replay( &self, - _block: &Block, + block: &Block, ) -> ExecutorResult> { - unimplemented!("ParallelExecutorAdapter does not support storage read replay"); + match &self.inner { + ParallelExecutorAdapterInner::Parallel { .. } => { + unimplemented!( + "ParallelExecutorAdapter does not support storage read replay" + ); + } + ParallelExecutorAdapterInner::Native(native) => { + native.executor.storage_read_replay(block) + } + } } } diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 9fe1da000e9..7a60a63b46b 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -92,9 +92,7 @@ pub struct Config { pub expensive_subscriptions: bool, pub utxo_validation: bool, pub allow_syscall: bool, - pub native_executor_version: Option, - #[cfg(feature = "parallel-executor")] - pub executor_number_of_cores: NonZeroUsize, + pub executor: ExecutorConfig, pub block_production: Trigger, pub leader_lock: Option, pub predefined_blocks_path: Option, @@ -252,9 +250,16 @@ impl Config { allow_syscall: true, expensive_subscriptions: true, utxo_validation, - native_executor_version: Some(native_executor_version), - #[cfg(feature = "parallel-executor")] - executor_number_of_cores: NonZeroUsize::new(1).expect("1 is not zero"), + executor: ExecutorConfig { + native_executor_version: Some(native_executor_version), + mode: ExecutorMode::Normal, + #[cfg(feature = "parallel-executor")] + parallel: ParallelExecutorConfig { + worker_count: NonZeroUsize::new(1).expect("1 is not zero"), + worker_count_policy: ParallelExecutorWorkerCountPolicy::StaticMax, + metrics: false, + }, + }, snapshot_reader, block_production: Trigger::Instant, leader_lock: None, @@ -372,6 +377,51 @@ impl From<&Config> for fuel_core_poa::pre_confirmation_signature_service::config Clone, Copy, Debug, Display, Eq, PartialEq, EnumString, VariantNames, ValueEnum, )] #[strum(serialize_all = "kebab_case")] +pub enum ExecutorMode { + Normal, + #[cfg(feature = "parallel-executor")] + Parallel, +} + +impl Default for ExecutorMode { + fn default() -> Self { + Self::Normal + } +} + +#[derive(Clone, Debug)] +pub struct ExecutorConfig { + pub mode: ExecutorMode, + pub native_executor_version: Option, + #[cfg(feature = "parallel-executor")] + pub parallel: ParallelExecutorConfig, +} + +#[cfg(feature = "parallel-executor")] +#[derive(Clone, Debug)] +pub struct ParallelExecutorConfig { + pub worker_count: NonZeroUsize, + pub worker_count_policy: ParallelExecutorWorkerCountPolicy, + pub metrics: bool, +} + +#[cfg(feature = "parallel-executor")] +#[derive(Clone, Copy, Debug, Display, Eq, PartialEq, EnumString, ValueEnum)] +#[strum(serialize_all = "kebab_case")] +pub enum ParallelExecutorWorkerCountPolicy { + StaticMax, + DynamicIdle, +} + +#[cfg(feature = "parallel-executor")] +impl Default for ParallelExecutorWorkerCountPolicy { + fn default() -> Self { + Self::StaticMax + } +} + +#[derive(Clone, Copy, Debug, Display, Eq, PartialEq, EnumString, ValueEnum)] +#[strum(serialize_all = "kebab_case")] pub enum DbType { InMemory, RocksDb, diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 6d5662deec3..03fe9c1fcb8 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -1,5 +1,10 @@ #![allow(clippy::let_unit_value)] +#[cfg(all(feature = "parallel-executor", not(feature = "no-parallel-executor")))] +use super::config::{ + ExecutorMode, + ParallelExecutorWorkerCountPolicy, +}; use super::{ DbType, adapters::{ @@ -67,6 +72,7 @@ use crate::{ }, }, }; + use fuel_core_compression_service::service::new_service as new_compression_service; use fuel_core_gas_price_service::v1::{ algorithm::AlgorithmV1, @@ -246,7 +252,7 @@ pub fn init_sub_services( forbid_unauthorized_inputs_default: config.utxo_validation, forbid_fake_utxo_default: config.utxo_validation, allow_syscall: config.allow_syscall, - native_executor_version: config.native_executor_version, + native_executor_version: config.executor.native_executor_version, allow_historical_execution: config.historical_execution, }; crate::service::adapters::ExecutorAdapter::new( @@ -267,7 +273,7 @@ pub fn init_sub_services( forbid_unauthorized_inputs_default: config.utxo_validation, forbid_fake_utxo_default: config.utxo_validation, allow_syscall: config.allow_syscall, - native_executor_version: config.native_executor_version, + native_executor_version: config.executor.native_executor_version, allow_historical_execution: config.historical_execution, }; crate::service::adapters::ExecutorAdapter::new( @@ -280,16 +286,55 @@ pub fn init_sub_services( } #[cfg(not(feature = "no-parallel-executor"))] { - let parallel_executor_config = fuel_core_parallel_executor::config::Config { - number_of_cores: config.executor_number_of_cores, - }; - crate::service::adapters::ParallelExecutorAdapter::new( - database.on_chain().clone(), - database.relayer().clone(), - parallel_executor_config, - new_txs_watcher, - preconfirmation_sender.clone(), - )? + match config.executor.mode { + ExecutorMode::Normal => { + let upgradable_executor_config = + fuel_core_upgradable_executor::config::Config { + forbid_unauthorized_inputs_default: config.utxo_validation, + forbid_fake_utxo_default: config.utxo_validation, + allow_syscall: config.allow_syscall, + native_executor_version: config + .executor + .native_executor_version, + allow_historical_execution: config.historical_execution, + }; + let executor = crate::service::adapters::ExecutorAdapter::new( + database.on_chain().clone(), + database.relayer().clone(), + upgradable_executor_config, + new_txs_watcher, + preconfirmation_sender.clone(), + ); + crate::service::adapters::ParallelExecutorAdapter::from_native( + executor, + ) + } + ExecutorMode::Parallel => { + let parallel_executor_config = + fuel_core_parallel_executor::config::Config { + worker_count: config.executor.parallel.worker_count, + worker_count_policy: match config + .executor + .parallel + .worker_count_policy + { + ParallelExecutorWorkerCountPolicy::StaticMax => { + fuel_core_parallel_executor::config::WorkerCountPolicy::StaticMax + } + ParallelExecutorWorkerCountPolicy::DynamicIdle => { + fuel_core_parallel_executor::config::WorkerCountPolicy::DynamicIdle + } + }, + metrics: config.executor.parallel.metrics, + }; + crate::service::adapters::ParallelExecutorAdapter::new( + database.on_chain().clone(), + database.relayer().clone(), + parallel_executor_config, + preconfirmation_sender.clone(), + )? + } + } } }; diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index 90d0f5b4ccc..f660fa139b2 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -20,6 +20,7 @@ regex = "1" strum = { workspace = true } strum_macros = { workspace = true } tracing = { workspace = true } +fuel-core-types = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } diff --git a/crates/metrics/src/buckets.rs b/crates/metrics/src/buckets.rs index c4fcb110556..5dbab8eb587 100644 --- a/crates/metrics/src/buckets.rs +++ b/crates/metrics/src/buckets.rs @@ -9,6 +9,9 @@ use strum_macros::EnumIter; #[cfg_attr(test, derive(EnumIter))] pub(crate) enum Buckets { Timing, + ParallelExecutorBatchTimeMs, + ParallelExecutorBatchTimeMicrosecondsPerTx, + ParallelExecutorBatchTimeNanosecondsPerKGas, TransactionSize, TransactionInsertionTimeInThreadPool, SelectTransactionsTime, @@ -38,6 +41,59 @@ fn initialize_buckets() -> HashMap> { 10.000, ], ), + ( + Buckets::ParallelExecutorBatchTimeMs, + vec![ + 1.0, + 10.0, + 25.0, + 50.0, + 100.0, + 150.0, + 200.0, + 250.0, + 300.0, + 400.0, + 500.0, + 750.0, + 1000.0, + ], + ), + ( + Buckets::ParallelExecutorBatchTimeMicrosecondsPerTx, + vec![ + 0.5, + 1.0, + 1.5, + 2.0, + 2.5, + 3.0, + 4.0, + 5.0, + 7.5, + 10.0, + 15.0, + 20.0, + ], + ), + ( + Buckets::ParallelExecutorBatchTimeNanosecondsPerKGas, + vec![ + 10.0, + 20.0, + 30.0, + 40.0, + 50.0, + 60.0, + 80.0, + 100.0, + 150.0, + 200.0, + 300.0, + 500.0, + 1000.0, + ], + ), ( // We consider blocks up to 256kb in size and single transaction can take any of this space. Buckets::TransactionSize, diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 785952224ab..58a9f43c78b 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -28,10 +28,28 @@ pub mod gas_price_metrics; pub mod graphql_metrics; pub mod importer; pub mod p2p_metrics; +pub mod parallel_executor_metrics; pub mod poa_metrics; pub mod tx_status_manager_metrics; pub mod txpool_metrics; +pub use self::parallel_executor_metrics::{ + next_debug_batch_metrics_block_height, + record_batch_execute, + record_batch_prepare, + record_batch_total, + record_execution_time, + record_scheduler_run_time, + set_batch_anchor_contracts, + set_block_height, + set_max_workers_used, + set_non_empty_batch_allocated_gas, + set_non_empty_batch_transactions, + set_non_empty_batch_used_gas, + set_number_of_transactions, + set_total_gas_used, +}; + static GLOBAL_REGISTER: OnceLock = OnceLock::new(); pub fn global_registry() -> &'static GlobalRegistry { diff --git a/crates/metrics/src/parallel_executor_metrics.rs b/crates/metrics/src/parallel_executor_metrics.rs new file mode 100644 index 00000000000..3c2aef91054 --- /dev/null +++ b/crates/metrics/src/parallel_executor_metrics.rs @@ -0,0 +1,418 @@ +use crate::{ + buckets::{ + Buckets, + buckets, + }, + global_registry, +}; +use fuel_core_types::fuel_tx::ContractId; +use prometheus_client::metrics::{ + family::Family, + gauge::Gauge, + histogram::Histogram, +}; +use prometheus_client::encoding::EncodeLabelSet; +use std::{ + sync::{ + OnceLock, + atomic::AtomicU64, + }, + time::Duration, +}; + +// TODO: We don't need all of these maybe. And some should be histograms, but I'm just using it for +// benchmarks +pub struct ParallelExecutorMetrics { + pub execution_time_seconds: Gauge, + pub number_of_transactions: Gauge, + pub total_gas_used: Gauge, + pub block_height: Gauge, + pub max_workers_used: Gauge, + pub non_empty_batches: Gauge, + pub non_empty_batch_transactions: Family, + pub non_empty_batch_allocated_gas: Family, + pub non_empty_batch_used_gas: Family, + pub batch_anchor_contracts: Family, + pub block_production_time_seconds: Gauge, + pub scheduler_run_time_seconds: Gauge, + pub batch_prepare_ms: Histogram, + pub batch_prepare_us_per_tx: Histogram, + pub batch_prepare_ns_per_kgas: Histogram, + pub batch_execute_ms: Histogram, + pub batch_execute_us_per_tx: Histogram, + pub batch_execute_ns_per_kgas: Histogram, + pub batch_total_ms: Histogram, + pub batch_total_us_per_tx: Histogram, + pub batch_total_ns_per_kgas: Histogram, + debug_batch_metrics_block_height: AtomicU64, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct BatchMetricLabel { + pub block_height: u64, + pub batch_index: u64, +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct BatchAnchorLabel { + pub block_height: u64, + pub batch_index: u64, + pub contract_id: String, +} + +impl Default for ParallelExecutorMetrics { + fn default() -> Self { + let execution_time_seconds = Gauge::default(); + let number_of_transactions = Gauge::default(); + let total_gas_used = Gauge::default(); + let block_height = Gauge::default(); + let max_workers_used = Gauge::default(); + let non_empty_batches = Gauge::default(); + let non_empty_batch_transactions = Family::default(); + let non_empty_batch_allocated_gas = Family::default(); + let non_empty_batch_used_gas = Family::default(); + let batch_anchor_contracts = Family::default(); + let block_production_time_seconds = Gauge::default(); + let scheduler_run_time_seconds = Gauge::default(); + let batch_prepare_ms = + Histogram::new(buckets(Buckets::ParallelExecutorBatchTimeMs)); + let batch_prepare_us_per_tx = + Histogram::new(buckets(Buckets::ParallelExecutorBatchTimeMicrosecondsPerTx)); + let batch_prepare_ns_per_kgas = Histogram::new(buckets( + Buckets::ParallelExecutorBatchTimeNanosecondsPerKGas, + )); + let batch_execute_ms = + Histogram::new(buckets(Buckets::ParallelExecutorBatchTimeMs)); + let batch_execute_us_per_tx = + Histogram::new(buckets(Buckets::ParallelExecutorBatchTimeMicrosecondsPerTx)); + let batch_execute_ns_per_kgas = Histogram::new(buckets( + Buckets::ParallelExecutorBatchTimeNanosecondsPerKGas, + )); + let batch_total_ms = + Histogram::new(buckets(Buckets::ParallelExecutorBatchTimeMs)); + let batch_total_us_per_tx = + Histogram::new(buckets(Buckets::ParallelExecutorBatchTimeMicrosecondsPerTx)); + let batch_total_ns_per_kgas = Histogram::new(buckets( + Buckets::ParallelExecutorBatchTimeNanosecondsPerKGas, + )); + + let metrics = ParallelExecutorMetrics { + execution_time_seconds, + number_of_transactions, + total_gas_used, + block_height, + max_workers_used, + non_empty_batches, + non_empty_batch_transactions, + non_empty_batch_allocated_gas, + non_empty_batch_used_gas, + batch_anchor_contracts, + block_production_time_seconds, + scheduler_run_time_seconds, + batch_prepare_ms, + batch_prepare_us_per_tx, + batch_prepare_ns_per_kgas, + batch_execute_ms, + batch_execute_us_per_tx, + batch_execute_ns_per_kgas, + batch_total_ms, + batch_total_us_per_tx, + batch_total_ns_per_kgas, + debug_batch_metrics_block_height: AtomicU64::new(0), + }; + + let mut registry = global_registry().registry.lock(); + registry.register( + "parallel_executor_execution_time_seconds", + "Time spent executing transactions in the parallel executor in seconds", + metrics.execution_time_seconds.clone(), + ); + registry.register( + "parallel_executor_number_of_transactions", + "Number of transactions executed by the parallel executor", + metrics.number_of_transactions.clone(), + ); + registry.register( + "parallel_executor_total_gas_used", + "Total gas used by transactions executed by the parallel executor", + metrics.total_gas_used.clone(), + ); + registry.register( + "parallel_executor_block_height", + "Block height for the parallel executor metrics sample", + metrics.block_height.clone(), + ); + registry.register( + "parallel_executor_max_workers_used", + "Maximum number of workers used concurrently by the parallel executor per block", + metrics.max_workers_used.clone(), + ); + registry.register( + "parallel_executor_non_empty_batches", + "Number of non-empty transaction batches created by the parallel executor per block", + metrics.non_empty_batches.clone(), + ); + registry.register( + "parallel_executor_non_empty_batch_transactions", + "Exact transaction counts for each non-empty batch keyed by synthetic block_height and batch_index", + metrics.non_empty_batch_transactions.clone(), + ); + registry.register( + "parallel_executor_non_empty_batch_allocated_gas", + "Allocated gas for each non-empty batch keyed by synthetic block_height and batch_index", + metrics.non_empty_batch_allocated_gas.clone(), + ); + registry.register( + "parallel_executor_non_empty_batch_used_gas", + "Used gas for each non-empty batch keyed by synthetic block_height and batch_index", + metrics.non_empty_batch_used_gas.clone(), + ); + registry.register( + "parallel_executor_batch_anchor_contract", + "Anchor contract ids chosen for each non-empty batch keyed by synthetic block_height and batch_index", + metrics.batch_anchor_contracts.clone(), + ); + registry.register( + "parallel_executor_block_production_time_seconds", + "Time spent producing blocks after transactions are added to the block", + metrics.block_production_time_seconds.clone(), + ); + registry.register( + "parallel_executor_scheduler_run_time_seconds", + "Total time spent running the parallel executor scheduler", + metrics.scheduler_run_time_seconds.clone(), + ); + registry.register( + "parallel_executor_batch_prepare_ms", + "Time spent preparing a batch in milliseconds", + metrics.batch_prepare_ms.clone(), + ); + registry.register( + "parallel_executor_batch_prepare_us_per_tx", + "Time spent preparing a batch in microseconds normalized by transactions", + metrics.batch_prepare_us_per_tx.clone(), + ); + registry.register( + "parallel_executor_batch_prepare_ns_per_kgas", + "Time spent preparing a batch in nanoseconds normalized by 1000 gas", + metrics.batch_prepare_ns_per_kgas.clone(), + ); + registry.register( + "parallel_executor_batch_execute_ms", + "Time spent executing a batch in milliseconds", + metrics.batch_execute_ms.clone(), + ); + registry.register( + "parallel_executor_batch_execute_us_per_tx", + "Time spent executing a batch in microseconds normalized by transactions", + metrics.batch_execute_us_per_tx.clone(), + ); + registry.register( + "parallel_executor_batch_execute_ns_per_kgas", + "Time spent executing a batch in nanoseconds normalized by 1000 gas", + metrics.batch_execute_ns_per_kgas.clone(), + ); + registry.register( + "parallel_executor_batch_total_ms", + "Total time spent preparing and executing a batch in milliseconds", + metrics.batch_total_ms.clone(), + ); + registry.register( + "parallel_executor_batch_total_us_per_tx", + "Total time spent preparing and executing a batch in microseconds normalized by transactions", + metrics.batch_total_us_per_tx.clone(), + ); + registry.register( + "parallel_executor_batch_total_ns_per_kgas", + "Total time spent preparing and executing a batch in nanoseconds normalized by 1000 gas", + metrics.batch_total_ns_per_kgas.clone(), + ); + + metrics + } +} + +static PARALLEL_EXECUTOR_METRICS: OnceLock = OnceLock::new(); + +pub fn parallel_executor_metrics() -> &'static ParallelExecutorMetrics { + PARALLEL_EXECUTOR_METRICS.get_or_init(ParallelExecutorMetrics::default) +} + +pub fn record_execution_time(duration: Duration) { + parallel_executor_metrics() + .execution_time_seconds + .set(duration.as_secs_f64()); +} + +pub fn set_number_of_transactions(count: u32) { + parallel_executor_metrics() + .number_of_transactions + .set(count as i64); +} + +pub fn set_total_gas_used(gas: u64) { + parallel_executor_metrics().total_gas_used.set(gas as i64); +} + +pub fn set_block_height(height: u32) { + parallel_executor_metrics().block_height.set(height as i64); +} + +pub fn set_max_workers_used(max_workers_used: u32) { + parallel_executor_metrics() + .max_workers_used + .set(max_workers_used as i64); +} + +pub fn next_debug_batch_metrics_block_height() -> u64 { + // TODO: Replace this synthetic block id with a real block/run identifier before merge. + parallel_executor_metrics() + .debug_batch_metrics_block_height + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + .saturating_add(1) +} + +pub fn set_non_empty_batch_transactions( + block_height: u64, + batch_tx_counts: &[u32], +) { + let metrics = parallel_executor_metrics(); + metrics + .non_empty_batches + .set(i64::try_from(batch_tx_counts.len()).unwrap_or(i64::MAX)); + + for (batch_index, tx_count) in batch_tx_counts.iter().enumerate() { + metrics + .non_empty_batch_transactions + .get_or_create(&BatchMetricLabel { + block_height, + batch_index: u64::try_from(batch_index).unwrap_or(u64::MAX), + }) + .set(i64::from(*tx_count)); + } +} + +pub fn set_non_empty_batch_allocated_gas(block_height: u64, batch_gas: &[u64]) { + let metrics = parallel_executor_metrics(); + for (batch_index, gas) in batch_gas.iter().enumerate() { + metrics + .non_empty_batch_allocated_gas + .get_or_create(&BatchMetricLabel { + block_height, + batch_index: u64::try_from(batch_index).unwrap_or(u64::MAX), + }) + .set(i64::try_from(*gas).unwrap_or(i64::MAX)); + } +} + +pub fn set_non_empty_batch_used_gas(block_height: u64, batch_gas: &[u64]) { + let metrics = parallel_executor_metrics(); + for (batch_index, gas) in batch_gas.iter().enumerate() { + metrics + .non_empty_batch_used_gas + .get_or_create(&BatchMetricLabel { + block_height, + batch_index: u64::try_from(batch_index).unwrap_or(u64::MAX), + }) + .set(i64::try_from(*gas).unwrap_or(i64::MAX)); + } +} + +pub fn set_batch_anchor_contracts( + block_height: u64, + batch_anchor_contracts: &[Vec], +) { + let metrics = parallel_executor_metrics(); + for (batch_index, anchors) in batch_anchor_contracts.iter().enumerate() { + for contract_id in anchors { + metrics + .batch_anchor_contracts + .get_or_create(&BatchAnchorLabel { + block_height, + batch_index: u64::try_from(batch_index).unwrap_or(u64::MAX), + contract_id: contract_id.to_string(), + }) + .set(1); + } + } +} + +pub fn record_block_production_time(duration: Duration) { + parallel_executor_metrics() + .block_production_time_seconds + .set(duration.as_secs_f64()); +} + +pub fn record_scheduler_run_time(duration: Duration) { + parallel_executor_metrics() + .scheduler_run_time_seconds + .set(duration.as_secs_f64()); +} + +fn duration_ms(duration: Duration) -> f64 { + duration.as_secs_f64() * 1000.0 +} + +fn duration_us(duration: Duration) -> f64 { + duration.as_secs_f64() * 1_000_000.0 +} + +fn duration_ns(duration: Duration) -> f64 { + duration.as_secs_f64() * 1_000_000_000.0 +} + +fn record_batch_time( + duration: Duration, + tx_count: u32, + gas: u64, + raw: &Histogram, + per_tx: &Histogram, + per_gas: &Histogram, +) { + let duration_ms = duration_ms(duration); + raw.observe(duration_ms); + let duration_us = duration_us(duration); + if tx_count > 0 { + per_tx.observe(duration_us / f64::from(tx_count)); + } + let gas_in_kgas = gas as f64 / 1000.0; + if gas_in_kgas > 0.0 { + per_gas.observe(duration_ns(duration) / gas_in_kgas); + } +} + +pub fn record_batch_prepare(duration: Duration, tx_count: u32, gas: u64) { + let metrics = parallel_executor_metrics(); + record_batch_time( + duration, + tx_count, + gas, + &metrics.batch_prepare_ms, + &metrics.batch_prepare_us_per_tx, + &metrics.batch_prepare_ns_per_kgas, + ); +} + +pub fn record_batch_execute(duration: Duration, tx_count: u32, gas: u64) { + let metrics = parallel_executor_metrics(); + record_batch_time( + duration, + tx_count, + gas, + &metrics.batch_execute_ms, + &metrics.batch_execute_us_per_tx, + &metrics.batch_execute_ns_per_kgas, + ); +} + +pub fn record_batch_total(duration: Duration, tx_count: u32, gas: u64) { + let metrics = parallel_executor_metrics(); + record_batch_time( + duration, + tx_count, + gas, + &metrics.batch_total_ms, + &metrics.batch_total_us_per_tx, + &metrics.batch_total_ns_per_kgas, + ); +} diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index d810adeee2b..3647b314688 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -345,6 +345,7 @@ where self.next_height(), block_time, TransactionsSource::TxPool, + // TODO: Don't hardcode deadline Instant::now() + Duration::from_secs(1), ) .await?; diff --git a/crates/services/importer/src/error.rs b/crates/services/importer/src/error.rs index 8697eede438..749b28dcd84 100644 --- a/crates/services/importer/src/error.rs +++ b/crates/services/importer/src/error.rs @@ -1,4 +1,7 @@ -use fuel_core_storage::Error as StorageError; +use fuel_core_storage::{ + Error as StorageError, + MerkleRoot, +}; use fuel_core_types::{ blockchain::primitives::BlockId, fuel_types::BlockHeight, diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index 4750c8a926d..b6c1d1d68ef 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -98,6 +98,18 @@ enum Commands { }, } +impl std::fmt::Debug for Commands { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Commands::Stop => write!(f, "Stop"), + Commands::CommitResult { .. } => write!(f, "CommitResult"), + #[cfg(test)] + Commands::VerifyAndExecuteBlock { .. } => write!(f, "VerifyAndExecuteBlock"), + Commands::PrepareImportResult { .. } => write!(f, "PrepareImportResult"), + } + } +} + struct ImporterInner { database: D, executor: E, @@ -394,7 +406,7 @@ where for item in iterator.iter_all::(None) { let (key, _) = item?; if let DenseMetadataKey::Latest = key { - return Err(Error::InvalidDatabaseStateAfterExecution) + return Err(Error::InvalidDatabaseStateAfterExecution(None, None)) } } @@ -404,8 +416,6 @@ where .map_err(Error::FailedBlockReconciliationWrite)?; } - let changes = db_after_execution.into_changes(); - #[cfg(feature = "test-helpers")] let changes_clone = changes.clone(); @@ -532,6 +542,8 @@ where async fn run(&mut self) { let local_runner = LocalRunner::new().expect("Failed to create the local runner"); while let Some(command) = self.commands.recv().await { + let instant = Instant::now(); + warn!("Importer command received: {:?}", &command); match command { Commands::Stop => break, Commands::CommitResult { @@ -559,6 +571,7 @@ where let _ = callback.send(result); } } + warn!("Importer command execution took {:?}", instant.elapsed()); } } diff --git a/crates/services/parallel-executor/Cargo.toml b/crates/services/parallel-executor/Cargo.toml index 323709e6ec1..9d4d7e560ce 100644 --- a/crates/services/parallel-executor/Cargo.toml +++ b/crates/services/parallel-executor/Cargo.toml @@ -17,6 +17,7 @@ fault-proving = ["fuel-core-types/fault-proving"] anyhow = { workspace = true } derive_more = { workspace = true, features = ["display"] } fuel-core-executor = { workspace = true, features = ["std", "u32-tx-count"] } +fuel-core-metrics = { workspace = true } fuel-core-storage = { workspace = true, features = ["std"] } fuel-core-types = { workspace = true, features = ["std", "test-helpers"] } futures = { workspace = true, features = ["std"] } diff --git a/crates/services/parallel-executor/src/config.rs b/crates/services/parallel-executor/src/config.rs index 3b8f6ba462d..6f46702ffb7 100644 --- a/crates/services/parallel-executor/src/config.rs +++ b/crates/services/parallel-executor/src/config.rs @@ -1,15 +1,28 @@ use std::num::NonZeroUsize; +#[derive(Clone, Copy, Debug, Default)] +pub enum WorkerCountPolicy { + #[default] + StaticMax, + DynamicIdle, +} + #[derive(Clone, Debug)] pub struct Config { /// The number of cores to use for the block execution. - pub number_of_cores: NonZeroUsize, + pub worker_count: NonZeroUsize, + /// How to choose worker count used for tx selection requests. + pub worker_count_policy: WorkerCountPolicy, + /// Enable metrics for the parallel executor. + pub metrics: bool, } impl Default for Config { fn default() -> Self { Self { - number_of_cores: NonZeroUsize::new(1).expect("The value is not zero; qed"), + worker_count: NonZeroUsize::new(1).expect("The value is not zero; qed"), + worker_count_policy: WorkerCountPolicy::StaticMax, + metrics: false, } } } diff --git a/crates/services/parallel-executor/src/executor.rs b/crates/services/parallel-executor/src/executor.rs index 6dc4df32dbd..40c9a5c09dc 100644 --- a/crates/services/parallel-executor/src/executor.rs +++ b/crates/services/parallel-executor/src/executor.rs @@ -93,7 +93,7 @@ impl Executor { config: Config, ) -> anyhow::Result { let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(config.number_of_cores.get()) + .worker_threads(config.worker_count.get()) .enable_all() .build()?; @@ -124,6 +124,7 @@ where where TxSource: TransactionsSource + Send + Sync + 'static, { + let instant = Instant::now(); // Initialize execution state let mut partial_block = PartialFuelBlock::new(components.header_to_produce, vec![]); @@ -171,6 +172,11 @@ where ) .await?; + tracing::warn!( + "produce_without_commit_with_source elapsed (before scheduler): {:?}", + instant.elapsed() + ); + // Run parallel scheduler for L2 transactions let scheduler_result = self .run_scheduler( @@ -188,14 +194,23 @@ where scheduler_result.events.len(), scheduler_result.skipped_txs.len() ); + tracing::warn!( + "produce_without_commit_with_source elapsed (before finalize): {:?}", + instant.elapsed() + ); // Finalize block with mint transaction - self.finalize_block( + let res = self.finalize_block( &mut components, scheduler_result, event_inbox_root, &mut executor, - ) + ); + tracing::warn!( + "produce_without_commit_with_source elapsed (after finalize): {:?}", + instant.elapsed() + ); + res } /// Process DA changes if the DA height has changed @@ -283,6 +298,7 @@ where where TxSource: TransactionsSource + Send + Sync + 'static, { + let instant = Instant::now(); let runtime = self.runtime.as_ref().expect( "Scheduler runtime \ is only removed on `drop`", @@ -306,6 +322,8 @@ where ) .await?; + let elapsed = instant.elapsed(); + tracing::warn!("run scheduler elapsed: {:?}", elapsed); Ok(res) } diff --git a/crates/services/parallel-executor/src/in_memory_transaction_with_contracts.rs b/crates/services/parallel-executor/src/in_memory_transaction_with_contracts.rs index b5803633d07..bc72ffa174d 100644 --- a/crates/services/parallel-executor/src/in_memory_transaction_with_contracts.rs +++ b/crates/services/parallel-executor/src/in_memory_transaction_with_contracts.rs @@ -175,39 +175,6 @@ where } } - fn read( - &self, - key: &[u8], - column: Self::Column, - offset: usize, - buf: &mut [u8], - ) -> StorageResult { - if let Some(operation) = self.get_from_changes(key, column) { - match operation { - WriteOperation::Insert(value) => { - let bytes_len = value.as_ref().len(); - let start = offset; - let buf_len = buf.len(); - let end = offset.saturating_add(buf.len()); - - if end > bytes_len { - return Err(anyhow::anyhow!( - "Offset `{offset}` + buf_len `{buf_len}` read until {end} which is out of bounds `{bytes_len}` for key `{:?}`", - key - ) - .into()); - } - - let starting_from_offset = &value.as_ref()[start..end]; - buf[..].copy_from_slice(starting_from_offset); - Ok(true) - } - WriteOperation::Remove => Ok(false), - } - } else { - self.storage.read(key, column, offset, buf) - } - } } impl KeyValueMutate for InMemoryTransactionWithContracts diff --git a/crates/services/parallel-executor/src/ports.rs b/crates/services/parallel-executor/src/ports.rs index 0e3f54536e1..384def3c462 100644 --- a/crates/services/parallel-executor/src/ports.rs +++ b/crates/services/parallel-executor/src/ports.rs @@ -28,6 +28,8 @@ impl Filter { pub struct TransactionSourceExecutableTransactions { /// The transactions that can be executed pub transactions: Vec, + /// Anchor contracts selected by the tx pool while building this batch + pub anchor_contract_ids: Vec, /// Indicates whether some transactions were filtered out based on the filter pub filtered: TransactionFiltered, /// The filter used to fetch these transactions @@ -41,6 +43,7 @@ pub trait TransactionsSource { gas_limit: u64, tx_count_limit: u32, block_transaction_size_limit: u64, + selection_worker_count: usize, filter: Filter, ) -> impl Future>; diff --git a/crates/services/parallel-executor/src/scheduler.rs b/crates/services/parallel-executor/src/scheduler.rs index b3a1a16e2a7..812ed49ff06 100644 --- a/crates/services/parallel-executor/src/scheduler.rs +++ b/crates/services/parallel-executor/src/scheduler.rs @@ -1,11 +1,8 @@ -use std::{ - collections::HashSet, - sync::Arc, - time::Duration, -}; - use crate::{ - config::Config, + config::{ + Config, + WorkerCountPolicy, + }, in_memory_transaction_with_contracts::InMemoryTransactionWithContracts, l1_execution_data::L1ExecutionData, memory::MemoryPool, @@ -39,6 +36,7 @@ use fuel_core_executor::{ RelayerPort, }, }; +use ::fuel_core_metrics as parallel_executor_metrics; use fuel_core_storage::{ Error as StorageError, column::Column, @@ -82,6 +80,20 @@ use fuel_core_types::{ }; use futures::future::Either; use fxhash::FxHashMap; +use std::{ + collections::{ + HashMap, + HashSet, + }, + sync::{ + Arc, + atomic::{ + AtomicU32, + Ordering, + }, + }, + time::Duration, +}; use tokio::{ runtime::Runtime, time::Instant, @@ -125,6 +137,8 @@ pub struct Scheduler<'a, R, S, PreconfirmationSender> { blob_transactions: Vec, /// Current scheduler state state: SchedulerState, + /// Batch preparation stats keyed by batch id + batch_preparations: Option>, /// Total maximum of transactions left tx_left: u32, /// Total maximum of byte size left @@ -135,6 +149,8 @@ pub struct Scheduler<'a, R, S, PreconfirmationSender> { deadline: Instant, /// Gas used by blob transactions blob_gas: u64, + /// Counters for tracking worker concurrency when metrics are enabled + worker_counters: Option, } struct WorkSessionExecutionResult { @@ -176,6 +192,15 @@ struct WorkSessionExecutionResult { used_size: u32, /// coinbase coinbase: u64, + /// Execution time for this batch + execution_duration: Duration, +} + +#[derive(Clone, Copy)] +struct BatchPreparationStats { + duration: Duration, + tx_count: u32, + gas: u64, } #[derive(Default)] @@ -294,6 +319,41 @@ pub(crate) struct PreparedBatch { pub number_of_transactions: u32, } +#[derive(Clone)] +struct WorkerCounters { + current: Arc, + max: Arc, +} + +impl WorkerCounters { + fn new() -> Self { + Self { + current: Arc::new(AtomicU32::new(0)), + max: Arc::new(AtomicU32::new(0)), + } + } + + fn record_started(&self) { + let current = self.current.fetch_add(1, Ordering::Relaxed) + 1; + self.max.fetch_max(current, Ordering::Relaxed); + } + + fn reset(&self) { + self.current.store(0, Ordering::Relaxed); + self.max.store(0, Ordering::Relaxed); + } +} + +struct WorkerCountGuard { + current: Arc, +} + +impl Drop for WorkerCountGuard { + fn drop(&mut self) { + self.current.fetch_sub(1, Ordering::Relaxed); + } +} + pub struct BlockConstraints { pub block_gas_limit: u64, pub total_execution_time: Duration, @@ -313,6 +373,8 @@ impl<'a, R, S, PreconfirmationSender> Scheduler<'a, R, S, PreconfirmationSender> consensus_parameters: ConsensusParameters, deadline: Instant, ) -> Result { + let batch_preparations = config.metrics.then(HashMap::new); + let worker_counters = config.metrics.then(WorkerCounters::new); Ok(Self { header_to_produce: components.header_to_produce, coinbase_recipient: components.coinbase_recipient, @@ -324,7 +386,7 @@ impl<'a, R, S, PreconfirmationSender> Scheduler<'a, R, S, PreconfirmationSender> tx_left: u32::MAX, tx_size_left: consensus_parameters.block_transaction_size_limit(), gas_left: consensus_parameters.block_gas_limit(), - worker_pool: WorkerPool::new(config.number_of_cores.get()), + worker_pool: WorkerPool::new(config.worker_count.get()), memory_pool, config, current_execution_tasks: FuturesUnordered::new(), @@ -336,6 +398,8 @@ impl<'a, R, S, PreconfirmationSender> Scheduler<'a, R, S, PreconfirmationSender> consensus_parameters, blob_gas: 0, deadline, + batch_preparations, + worker_counters, }) } } @@ -356,6 +420,7 @@ where where TxSource: TransactionsSource, { + let instant = Instant::now(); let view = self.storage.latest_view()?; let storage_with_da = Arc::new(view.into_transaction().with_changes(da_changes)); self.update_constraints( @@ -367,12 +432,22 @@ where let mut new_tx_notifier = tx_source.get_new_transactions_notifier(); let now = Instant::now(); let deadline = self.deadline; + let mut execution_time_recorded = false; let mut nb_batch_created = 0; let mut nb_transactions: u32 = l1_execution_data.tx_count; + let mut non_empty_batch_tx_counts = self.config.metrics.then(Vec::new); + let mut non_empty_batch_allocated_gas = self.config.metrics.then(Vec::new); + let mut non_empty_batch_used_gas = self.config.metrics.then(Vec::new); + let mut non_empty_batch_anchors = self.config.metrics.then(Vec::new); + let batch_metrics_block_height = self + .config + .metrics + .then(parallel_executor_metrics::next_debug_batch_metrics_block_height) + .unwrap_or(0); let initial_gas_per_worker = self .consensus_parameters .block_gas_limit() - .checked_div(self.config.number_of_cores.get() as u64) + .checked_div(self.config.worker_count.get() as u64) .ok_or(SchedulerError::InternalError( "Invalid block gas limit".to_string(), ))? @@ -380,7 +455,9 @@ where .ok_or(SchedulerError::InternalError( "L1 transactions consumed all the gas".to_string(), ))?; + let mut total_gas: u64 = 0; + tracing::warn!("scheduler starting run loop at {:?}", instant.elapsed()); 'outer: loop { let tx_notifier = if new_tx_notifier.has_changed().is_ok() { Either::Left(new_tx_notifier.changed()) @@ -392,9 +469,22 @@ where if self.is_worker_idling() { // If we requested transactions, we shouldn't drop them, // so we await them here. - let mut batch = self - .ask_new_transactions_batch(tx_source, now, initial_gas_per_worker) + let batch_prepare_start = Instant::now(); + let selection_worker_count = self.selection_worker_count(); + let (mut batch, anchor_contract_ids) = self + .ask_new_transactions_batch( + tx_source, + now, + initial_gas_per_worker, + selection_worker_count, + ) .await?; + let batch_prepare_duration = batch_prepare_start.elapsed(); + tracing::warn!( + "new batch id {:?} prepared at: {:?}", + nb_batch_created, + instant.elapsed() + ); let blob_transactions = core::mem::take(&mut batch.blob_transactions); self.blob_transactions.extend(blob_transactions.into_iter()); @@ -409,6 +499,38 @@ where } let batch_len = batch.number_of_transactions; + if self.config.metrics { + if let Some(batch_tx_counts) = non_empty_batch_tx_counts.as_mut() { + batch_tx_counts.push(batch_len); + } + if let Some(batch_allocated_gas) = + non_empty_batch_allocated_gas.as_mut() + { + batch_allocated_gas.push(batch.gas); + } + if let Some(batch_used_gas) = non_empty_batch_used_gas.as_mut() { + batch_used_gas.push(0); + } + if let Some(batch_anchors) = non_empty_batch_anchors.as_mut() { + batch_anchors.push(anchor_contract_ids); + } + parallel_executor_metrics::record_batch_prepare( + batch_prepare_duration, + batch_len, + batch.gas, + ); + if let Some(batch_preparations) = self.batch_preparations.as_mut() { + batch_preparations.insert( + nb_batch_created, + BatchPreparationStats { + duration: batch_prepare_duration, + tx_count: batch_len, + gas: batch.gas, + }, + ); + } + total_gas = total_gas.saturating_add(batch.gas); + } self.execute_batch( batch, @@ -424,29 +546,86 @@ where ), )?; } else if self.current_execution_tasks.is_empty() { + let waiting = Instant::now(); tokio::select! { _ = tx_notifier => { self.state = SchedulerState::TransactionsReadyForPickup; } _ = tokio::time::sleep_until(deadline) => { + if !execution_time_recorded { + let execution_time = instant + .elapsed() + .saturating_sub(waiting.elapsed()); + parallel_executor_metrics::record_execution_time(execution_time); + execution_time_recorded = true; + } + tracing::warn!("******"); + tracing::warn!("waited until deadline for {:?}, total elapsed: {:?}", waiting.elapsed(), instant.elapsed()); break 'outer; } } } else { + tracing::warn!("Waiting for workers to finish"); tokio::select! { _ = tx_notifier => { + tracing::warn!("New transactions received"); self.state = SchedulerState::TransactionsReadyForPickup; } result = self.current_execution_tasks.select_next_some() => { + tracing::warn!("Worker finished at {:?}", instant.elapsed()); match result { Ok(res) => { let res = res?; if !res.skipped_tx.is_empty() { + if self.config.metrics { + if let Some(batch_preparations) = + self.batch_preparations.as_mut() + { + batch_preparations.remove(&res.batch_id); + } + } drop(res.worker_id); self.sequential_fallback(res.batch_id, res.txs, res.coins_used, res.coins_created, res.message_nonces_used).await?; continue; } + if self.config.metrics { + if let Some(batch_used_gas) = + non_empty_batch_used_gas.as_mut() + { + if let Some(slot) = batch_used_gas + .get_mut(res.batch_id) + { + *slot = res.used_gas; + } + } + if let Some(batch_preparations) = + self.batch_preparations.as_mut() + { + if let Some(prep) = + batch_preparations.remove(&res.batch_id) + { + let gas_for_norm = + if res.used_gas > 0 { + res.used_gas + } else { + prep.gas + }; + parallel_executor_metrics::record_batch_execute( + res.execution_duration, + prep.tx_count, + gas_for_norm, + ); + parallel_executor_metrics::record_batch_total( + prep.duration.saturating_add( + res.execution_duration, + ), + prep.tx_count, + gas_for_norm, + ); + } + } + } self.register_execution_result(res); } _ => { @@ -457,20 +636,71 @@ where } } _ = tokio::time::sleep_until(deadline) => { + tracing::warn!("timeout waiting on workers"); break 'outer; } } } } - self.wait_all_execution_tasks().await?; + tracing::warn!("******"); + tracing::warn!("waiting for execution tasks: {:?}", instant.elapsed()); + let exceeded_deadline = self.wait_all_execution_tasks().await?; + tracing::warn!("execution tasks done: {:?}", instant.elapsed()); + if self.config.metrics { + if exceeded_deadline && !execution_time_recorded { + parallel_executor_metrics::record_execution_time(instant.elapsed()); + } + parallel_executor_metrics::set_number_of_transactions(nb_transactions); + parallel_executor_metrics::set_total_gas_used(total_gas); + let block_height = u32::from(*self.header_to_produce.height()); + parallel_executor_metrics::set_block_height(block_height); + parallel_executor_metrics::set_max_workers_used( + self.worker_counters + .as_ref() + .map(|counters| counters.max.load(Ordering::Relaxed)) + .unwrap_or(0), + ); + parallel_executor_metrics::set_non_empty_batch_transactions( + batch_metrics_block_height, + non_empty_batch_tx_counts.as_deref().unwrap_or(&[]), + ); + parallel_executor_metrics::set_non_empty_batch_allocated_gas( + batch_metrics_block_height, + non_empty_batch_allocated_gas.as_deref().unwrap_or(&[]), + ); + parallel_executor_metrics::set_non_empty_batch_used_gas( + batch_metrics_block_height, + non_empty_batch_used_gas.as_deref().unwrap_or(&[]), + ); + parallel_executor_metrics::set_batch_anchor_contracts( + batch_metrics_block_height, + non_empty_batch_anchors.as_deref().unwrap_or(&[]), + ); + if let Some(counters) = self.worker_counters.as_ref() { + counters.reset(); + } + } + + // let mut res = self.verify_coherency_and_merge_results( + // nb_batch_created, + // l1_execution_data, + // storage_with_da.clone(), + // )?; - let mut res = self.verify_coherency_and_merge_results( + let result = self.verify_coherency_and_merge_results( nb_batch_created, l1_execution_data, storage_with_da.clone(), - )?; + ); + if result.is_err() { + tracing::warn!("coherency result: {:?}", result); + } + + let mut res = result?; + + tracing::warn!("scheduler done: {:?}", instant.elapsed()); if !self.blob_transactions.is_empty() { let mut tx = StorageTransaction::transaction( storage_with_da.clone(), @@ -484,11 +714,14 @@ where return Err(SchedulerError::StorageError(e)); } } + tracing::warn!("committed changes: {:?}", instant.elapsed()); res.changes = StorageChanges::Changes(Default::default()); let (blob_execution_data, blob_txs) = self.execute_blob_transactions(tx, nb_transactions).await?; + tracing::warn!("blob execution done: {:?}", instant.elapsed()); res.add_blob_execution_data(blob_execution_data, blob_txs); + tracing::warn!("blob execution data added: {:?}", instant.elapsed()); } // TODO: Avoid cloning the DA changes @@ -503,6 +736,11 @@ where } } + let execution_time = instant.elapsed(); + tracing::warn!("Scheduler `run` execution time: {:?}", execution_time); + if self.config.metrics { + parallel_executor_metrics::record_scheduler_run_time(execution_time); + } Ok(res) } @@ -540,10 +778,12 @@ where tx_source: &TxSource, start_execution_time: Instant, initial_gas_per_core: u64, - ) -> Result + selection_worker_count: usize, + ) -> Result<(PreparedBatch, Vec), SchedulerError> where TxSource: TransactionsSource, { + let instant = Instant::now(); let total_execution_time = self .deadline .checked_duration_since(start_execution_time) @@ -571,6 +811,7 @@ where current_gas, self.tx_left, self.tx_size_left, + selection_worker_count, Filter { excluded_contract_ids: std::mem::take( &mut self.current_executing_contracts, @@ -586,6 +827,7 @@ where })?; self.current_executing_contracts = executable_transactions.filter.excluded_contract_ids; + let anchor_contract_ids = executable_transactions.anchor_contract_ids; let prepared_batch = prepare_transactions_batch( &self.consensus_parameters, @@ -596,7 +838,19 @@ where prepared_batch.total_size, prepared_batch.gas, )?; - Ok(prepared_batch) + tracing::warn!( + "new batch prepared in: {:?}, for {:?} txs", + instant.elapsed(), + prepared_batch.number_of_transactions + ); + Ok((prepared_batch, anchor_contract_ids)) + } + + fn selection_worker_count(&self) -> usize { + match self.config.worker_count_policy { + WorkerCountPolicy::StaticMax => self.config.worker_count.get(), + WorkerCountPolicy::DynamicIdle => self.worker_pool.available_workers(), + } } fn execute_batch( @@ -612,6 +866,7 @@ where .ok_or(SchedulerError::InternalError( "No available workers".to_string(), ))?; + let worker_counters = self.worker_counters.clone(); let mut changes_per_contract = Vec::with_capacity(batch.contracts_used.len()); @@ -629,8 +884,15 @@ where let mut memory = self.memory_pool.take_raw(); let future = { + let instant = Instant::now(); let storage_with_da = storage_with_da.clone(); async move { + let _worker_guard = worker_counters.as_ref().map(|counters| { + counters.record_started(); + WorkerCountGuard { + current: counters.current.clone(), + } + }); let changes_per_contract: FxHashMap = changes_per_contract.into_iter().collect(); let memory_tx = InMemoryTransactionWithContracts::new( @@ -678,6 +940,13 @@ where let (changes, changes_per_contract) = storage_tx.into_storage().into_changes(); + let batch_duration = instant.elapsed(); + tracing::warn!( + "batch {:?} duration: {:?} with {:?} txs", + batch_id, + batch_duration, + transactions.len() + ); Ok(WorkSessionExecutionResult { worker_id, batch_id, @@ -696,6 +965,7 @@ where gas_diff: batch.gas.saturating_sub(execution_data.used_gas), used_size: execution_data.used_size, coinbase: execution_data.coinbase, + execution_duration: batch_duration, }) } }; @@ -738,7 +1008,8 @@ where ); } - async fn wait_all_execution_tasks(&mut self) -> Result<(), SchedulerError> { + // returns `true` if exceeded deadline + async fn wait_all_execution_tasks(&mut self) -> Result { // We have reached the deadline // We need to merge the states of all the workers while !self.current_execution_tasks.is_empty() { @@ -786,6 +1057,7 @@ where } let now = Instant::now(); + let mut exceeded_deadline = false; if now > self.deadline { tracing::warn!( "Execution time exceeded the limit by: {}ms", @@ -793,8 +1065,9 @@ where .expect("Checked above; qed") .as_millis() ); + exceeded_deadline = true; } - Ok(()) + Ok(exceeded_deadline) } fn verify_coherency_and_merge_results( diff --git a/crates/services/parallel-executor/src/scheduler/workers.rs b/crates/services/parallel-executor/src/scheduler/workers.rs index f380c2d19fb..c40e2bf7d07 100644 --- a/crates/services/parallel-executor/src/scheduler/workers.rs +++ b/crates/services/parallel-executor/src/scheduler/workers.rs @@ -42,6 +42,10 @@ impl WorkerPool { *self.workers.lock() == 0 } + pub fn available_workers(&self) -> usize { + *self.workers.lock() + } + pub fn return_worker(&self) { let mut workers = self.workers.lock(); *workers = workers.saturating_add(1); diff --git a/crates/services/parallel-executor/src/tests/mocks.rs b/crates/services/parallel-executor/src/tests/mocks.rs index 632d33c094f..5abd099ab05 100644 --- a/crates/services/parallel-executor/src/tests/mocks.rs +++ b/crates/services/parallel-executor/src/tests/mocks.rs @@ -57,6 +57,7 @@ pub struct MockTxPoolResponse { pub filtered: TransactionFiltered, pub filter: Option, pub gas_limit_lt: Option, + pub selection_worker_count: Option, } impl MockTxPoolResponse { pub fn new(transactions: &[&Transaction], filtered: TransactionFiltered) -> Self { @@ -65,6 +66,7 @@ impl MockTxPoolResponse { filtered, filter: None, gas_limit_lt: None, + selection_worker_count: None, } } pub fn assert_filter(self, filter: Filter) -> Self { @@ -73,6 +75,7 @@ impl MockTxPoolResponse { filtered: self.filtered, filter: Some(filter), gas_limit_lt: self.gas_limit_lt, + selection_worker_count: self.selection_worker_count, } } pub fn assert_gas_limit_lt(self, gas_limit: u64) -> Self { @@ -81,6 +84,16 @@ impl MockTxPoolResponse { filtered: self.filtered, filter: self.filter, gas_limit_lt: Some(gas_limit), + selection_worker_count: self.selection_worker_count, + } + } + pub fn assert_selection_worker_count(self, selection_worker_count: usize) -> Self { + Self { + transactions: self.transactions, + filtered: self.filtered, + filter: self.filter, + gas_limit_lt: self.gas_limit_lt, + selection_worker_count: Some(selection_worker_count), } } } @@ -111,6 +124,7 @@ impl TransactionsSource for MockTransactionsSource { gas_limit: u64, tx_count_limit: u32, _block_transaction_size_limit: u64, + selection_worker_count: usize, filter: Filter, ) -> anyhow::Result { let mut response_queue = self.response_queue.lock().expect("Mutex poisoned"); @@ -127,14 +141,20 @@ impl TransactionsSource for MockTransactionsSource { gas_limit, ); } + if let Some(expected_selection_worker_count) = response.selection_worker_count + { + assert_eq!(expected_selection_worker_count, selection_worker_count); + } Ok(TransactionSourceExecutableTransactions { transactions: response.transactions, + anchor_contract_ids: vec![], filtered: response.filtered, filter: response.filter.unwrap_or(filter), }) } else { Ok(TransactionSourceExecutableTransactions { transactions: vec![], + anchor_contract_ids: vec![], filtered: TransactionFiltered::NotFiltered, filter, }) diff --git a/crates/services/parallel-executor/src/tests/tests_executor.rs b/crates/services/parallel-executor/src/tests/tests_executor.rs index f5a1867a21a..2df0427d87b 100644 --- a/crates/services/parallel-executor/src/tests/tests_executor.rs +++ b/crates/services/parallel-executor/src/tests/tests_executor.rs @@ -4,7 +4,10 @@ use std::time::Duration; use crate::{ - config::Config, + config::{ + Config, + WorkerCountPolicy, + }, executor::Executor, ports::{ Filter, @@ -200,8 +203,10 @@ async fn contract_creation_changes(rng: &mut StdRng) -> (ContractId, StorageChan MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(2) + worker_count: std::num::NonZeroUsize::new(2) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); @@ -247,8 +252,10 @@ async fn execute__simple_independent_transactions_sorted() { MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(2) + worker_count: std::num::NonZeroUsize::new(2) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); @@ -295,6 +302,67 @@ async fn execute__simple_independent_transactions_sorted() { assert_eq!(expected_ids, actual_ids); } +#[tokio::test] +async fn execute__when_dynamic_idle_policy_then_selection_uses_idle_worker_count() { + let mut rng = rand::rngs::StdRng::seed_from_u64(2322); + let mut storage = Storage::default(); + storage = add_consensus_parameters(storage, &ConsensusParameters::default()); + + // given + let script = [ + op::movi(0x11, 32), + op::aloc(0x11), + op::movi(0x10, 0x00), + op::cfe(0x10), + op::k256(RegId::HP, RegId::ZERO, 0x10), + ]; + let script_bytes: Vec = script.iter().flat_map(|op| op.to_bytes()).collect(); + let long_tx = TransactionBuilder::script(script_bytes, vec![]) + .script_gas_limit(100_000) + .add_stored_coin_input(&mut rng, &mut storage, 1000) + .finalize_as_transaction(); + let mut executor: Executor = + Executor::new( + storage, + MockRelayer, + MockPreconfirmationSender, + Config { + worker_count: std::num::NonZeroUsize::new(2) + .expect("The value is not zero; qed"), + worker_count_policy: WorkerCountPolicy::DynamicIdle, + metrics: false, + }, + ) + .unwrap(); + let (transactions_source, mock_tx_pool) = MockTransactionsSource::new(); + + // when + let future = executor.produce_without_commit_with_source( + Components { + header_to_produce: Default::default(), + transactions_source, + coinbase_recipient: Default::default(), + gas_price: 0, + }, + Instant::now() + Duration::from_millis(300), + ); + mock_tx_pool.push_response( + MockTxPoolResponse::new(&[&long_tx], TransactionFiltered::NotFiltered) + .assert_selection_worker_count(2), + ); + mock_tx_pool.push_response( + MockTxPoolResponse::new(&[], TransactionFiltered::NotFiltered) + .assert_selection_worker_count(1), + ); + mock_tx_pool.push_response(MockTxPoolResponse::new( + &[], + TransactionFiltered::NotFiltered, + )); + + // then + let _ = future.await.unwrap(); +} + #[tokio::test] async fn execute__filter_contract_id_currently_executed_and_fetch_after() { let mut rng = rand::rngs::StdRng::seed_from_u64(2322); @@ -327,8 +395,10 @@ async fn execute__filter_contract_id_currently_executed_and_fetch_after() { MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(2) + worker_count: std::num::NonZeroUsize::new(2) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); @@ -444,8 +514,10 @@ async fn execute__gas_left_updated_when_state_merges() { MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(2) + worker_count: std::num::NonZeroUsize::new(2) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); @@ -536,8 +608,10 @@ async fn execute__utxo_ordering_kept() { MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(2) + worker_count: std::num::NonZeroUsize::new(2) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); @@ -610,8 +684,10 @@ async fn execute__utxo_resolved() { MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(2) + worker_count: std::num::NonZeroUsize::new(2) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); @@ -703,8 +779,10 @@ async fn execute__trigger_skipped_txs_fallback_mechanism() { MockRelayer, MockPreconfirmationSender, Config { - number_of_cores: std::num::NonZeroUsize::new(3) + worker_count: std::num::NonZeroUsize::new(3) .expect("The value is not zero; qed"), + worker_count_policy: crate::config::WorkerCountPolicy::StaticMax, + metrics: false, }, ) .unwrap(); diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index 90240c81f44..b773ba82fb7 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -715,7 +715,8 @@ mod dry_run { .unwrap(); // Then - let guard = producer.executor.captured.lock().unwrap(); + let executor = producer.executor.lock().await; + let guard = executor.captured.lock().unwrap(); let produced_version = guard .as_ref() .unwrap() @@ -755,7 +756,8 @@ mod dry_run { .unwrap(); // Then - let guard = producer.executor.captured.lock().unwrap(); + let executor = producer.executor.lock().await; + let guard = executor.captured.lock().unwrap(); let produced_version = guard .as_ref() .unwrap() @@ -793,7 +795,8 @@ mod dry_run { .unwrap(); // Then - let guard = producer.executor.captured.lock().unwrap(); + let executor = producer.executor.lock().await; + let guard = executor.captured.lock().unwrap(); let produced_version = guard .as_ref() .unwrap() @@ -832,7 +835,8 @@ mod dry_run { .unwrap(); // Then - let guard = producer.executor.captured.lock().unwrap(); + let executor = producer.executor.lock().await; + let guard = executor.captured.lock().unwrap(); let produced_version = guard .as_ref() .unwrap() diff --git a/crates/services/txpool_v2/src/config.rs b/crates/services/txpool_v2/src/config.rs index bdeb831e556..b10ddeb7baf 100644 --- a/crates/services/txpool_v2/src/config.rs +++ b/crates/services/txpool_v2/src/config.rs @@ -146,6 +146,8 @@ pub struct Config { pub max_pending_pool_size_percentage: u16, /// Enable metrics when set to true pub metrics: bool, + /// Allow mixing dependency graphs after the initial batch fill. + pub eagerly_include_tx_dependency_graphs: bool, } #[derive(Clone, Debug)] @@ -205,9 +207,10 @@ impl Default for Config { max_pending_write_pool_requests: 1000, max_pending_read_pool_requests: 1000, }, - pending_pool_tx_ttl: Duration::from_secs(3), + pending_pool_tx_ttl: Duration::from_secs(100), max_pending_pool_size_percentage: 50, metrics: false, + eagerly_include_tx_dependency_graphs: false, } } } diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index 390eca038a8..a61f0f83af4 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -375,15 +375,21 @@ where /// Extract transactions for a block. /// Returns a list of transactions that were selected for the block /// based on the constraints given in the configuration and the selection algorithm used. - pub fn extract_transactions_for_block( + pub fn extract_transactions_for_block_with_anchors( &mut self, constraints: &Constraints, - ) -> Vec { + ) -> (Vec, Vec) { let metrics = self.config.metrics; let maybe_start = metrics.then(std::time::Instant::now); + let select_start = Instant::now(); + let tx_count_before = self.tx_count(); + let executable_before = + self.selection_algorithm.number_of_executable_transactions(); let best_txs = self .selection_algorithm .gather_best_txs(constraints, &mut self.storage); + let selected_anchors = self.selection_algorithm.last_selection_anchors().to_vec(); + let select_elapsed = select_start.elapsed(); if let Some(start) = maybe_start { Self::record_select_transaction_time(start) @@ -411,7 +417,22 @@ where self.update_stats(); - txs + let tx_count_after = self.tx_count(); + let executable_after = + self.selection_algorithm.number_of_executable_transactions(); + + tracing::warn!( + tx_count_before, + executable_before, + tx_count_after, + executable_after, + selected_count = txs.len(), + selected_anchor_count = selected_anchors.len(), + select_time_us = select_elapsed.as_micros(), + "txpool_v2 selection summary" + ); + + (txs, selected_anchors) } pub fn get(&self, tx_id: &TxId) -> Option<&StorageData> { diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index a1789fb5efb..e2b95561118 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -10,6 +10,7 @@ use crate::{ TxPoolPersistentStorage, TxStatusManager as TxStatusManagerTrait, }, + selection_algorithms::SelectionAlgorithm, service::{ TxInfo, TxPool, @@ -209,7 +210,8 @@ pub(super) enum PoolInsertRequest { pub(super) enum PoolExtractBlockTransactions { ExtractBlockTransactions { constraints: Constraints, - transactions: oneshot::Sender<(Vec, HashSet)>, + transactions: + oneshot::Sender<(Vec, HashSet, Vec)>, }, } @@ -423,6 +425,12 @@ where } let resolved_txs = self.pending_pool.new_known_tx(tx.utxo_ids_with_outputs()); + // tracing::warn!( + // result = "ok", + // pending_pool_txs = self.pending_pool.current_txs, + // resolved_txs = resolved_txs.len(), + // "txpool_v2 insert" + // ); for (tx, source) in resolved_txs { if let Err(e) = self @@ -437,6 +445,12 @@ where } } Err(InsertionErrorType::MissingInputs(missing_inputs)) => { + tracing::warn!( + result = "missing_inputs", + pending_pool_txs = self.pending_pool.current_txs, + resolved_txs = 0, + "txpool_v2 insert" + ); if missing_inputs.is_empty() { debug_assert!(false, "Missing inputs should not be empty"); } else if !self.has_enough_space_in_pools(&tx) { @@ -463,6 +477,12 @@ where } } Err(InsertionErrorType::Error(error)) => { + tracing::warn!( + result = "error", + pending_pool_txs = self.pending_pool.current_txs, + resolved_txs = 0, + "txpool_v2 insert" + ); if let Err(e) = self.notification_sender .try_send(PoolNotification::ErrorInsertion { @@ -480,10 +500,31 @@ where fn extract_block_transactions( &mut self, constraints: Constraints, - blocks: oneshot::Sender<(Vec, HashSet)>, + blocks: oneshot::Sender<(Vec, HashSet, Vec)>, ) { - let txs = self.pool.extract_transactions_for_block(&constraints); - if blocks.send((txs, constraints.excluded_contracts)).is_err() { + tracing::warn!( + max_gas = constraints.max_gas, + max_txs = constraints.maximum_txs, + max_block_size = constraints.maximum_block_size, + excluded_contracts = constraints.excluded_contracts.len(), + tx_count = self.pool.tx_count(), + executable_count = self + .pool + .selection_algorithm + .number_of_executable_transactions(), + "txpool_v2 extract_block_transactions start" + ); + let (txs, selected_anchors) = + self.pool.extract_transactions_for_block_with_anchors(&constraints); + tracing::warn!( + result_size = txs.len(), + selected_anchor_count = selected_anchors.len(), + "txpool_v2 extract_block_transactions result" + ); + if blocks + .send((txs, constraints.excluded_contracts, selected_anchors)) + .is_err() + { tracing::error!("Failed to send block transactions"); } } diff --git a/crates/services/txpool_v2/src/selection_algorithms/mod.rs b/crates/services/txpool_v2/src/selection_algorithms/mod.rs index 58c6c84f824..321fea2b07d 100644 --- a/crates/services/txpool_v2/src/selection_algorithms/mod.rs +++ b/crates/services/txpool_v2/src/selection_algorithms/mod.rs @@ -25,6 +25,8 @@ pub struct Constraints { pub maximum_block_size: u64, /// List of excluded contracts. pub excluded_contracts: HashSet, + /// Number of execution workers available for the block. + pub execution_worker_count: usize, } /// The selection algorithm is responsible for selecting the best transactions to include in a block. @@ -53,6 +55,9 @@ pub trait SelectionAlgorithm { /// Get less worth transactions iterator fn get_less_worth_txs(&self) -> impl Iterator; + /// Returns the anchor contracts chosen during the most recent selection call. + fn last_selection_anchors(&self) -> &[ContractId]; + /// Inform the selection algorithm that a transaction was removed from the pool. fn on_removed_transaction(&mut self, storage_entry: &StorageData); } diff --git a/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs b/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs index 5bfe1a4c5aa..7349ab12b79 100644 --- a/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs +++ b/crates/services/txpool_v2/src/selection_algorithms/ratio_tip_gas.rs @@ -3,12 +3,21 @@ use std::{ Ordering, Reverse, }, - collections::BTreeMap, + collections::{ + BTreeMap, + HashMap, + HashSet, + VecDeque, + }, fmt::Debug, time::SystemTime, }; -use fuel_core_types::fuel_tx::TxId; +use fuel_core_types::fuel_tx::{ + ContractId, + Input, + TxId, +}; use num_rational::Ratio; use crate::storage::{ @@ -24,11 +33,8 @@ use super::{ #[cfg(test)] use fuel_core_types::services::txpool::ArcPoolTx; -#[cfg(test)] -use std::collections::HashMap; - pub trait RatioTipGasSelectionAlgorithmStorage { - type StorageIndex: Debug; + type StorageIndex: Copy + Debug; fn get(&self, index: &Self::StorageIndex) -> Option<&StorageData>; @@ -44,6 +50,11 @@ pub trait RatioTipGasSelectionAlgorithmStorage { pub type RatioTipGas = Ratio; +#[cfg(feature = "u32-tx-count")] +type TxCount = u32; +#[cfg(not(feature = "u32-tx-count"))] +type TxCount = u16; + /// Key used to sort transactions by tip/gas ratio. /// It first compares the tip/gas ratio, then the creation instant and finally the transaction id. #[derive(Eq, PartialEq, Clone, Copy, Debug)] @@ -75,23 +86,110 @@ impl PartialOrd for Key { } } +#[derive(Default)] +struct SkipCounters { + not_enough_gas: usize, + too_big_tx: usize, + less_price: usize, + excluded_contracts: usize, +} + +struct SelectionBudget { + gas_left: u64, + space_left: u64, + nb_left: TxCount, +} + +struct AnchorState { + threshold_pct: u8, + total_gas: u64, + current_anchor: Option, + current_anchor_gas: u64, + locked_anchor: Option, +} + +impl AnchorState { + fn new(threshold_pct: u8) -> Self { + Self { + threshold_pct, + total_gas: 0, + current_anchor: None, + current_anchor_gas: 0, + locked_anchor: None, + } + } + + fn on_selected(&mut self, tx_anchor: Option, tx_gas: u64) { + self.total_gas = self.total_gas.saturating_add(tx_gas); + if self.locked_anchor.is_some() { + return; + } + if self.current_anchor == tx_anchor { + self.current_anchor_gas = self.current_anchor_gas.saturating_add(tx_gas); + } else { + self.current_anchor = tx_anchor; + self.current_anchor_gas = tx_gas; + } + if self.current_anchor_gas.saturating_mul(100) + >= self + .total_gas + .saturating_mul(u64::from(self.threshold_pct)) + { + self.locked_anchor = self.current_anchor; + } + } +} + +impl SelectionBudget { + fn new(constraints: &Constraints) -> Self { + Self { + gas_left: constraints.max_gas, + space_left: constraints.maximum_block_size, + nb_left: constraints.maximum_txs, + } + } + + fn has_capacity(&self) -> bool { + self.gas_left > 0 && self.space_left > 0 && self.nb_left > 0 + } +} + /// The selection algorithm that selects transactions based on the tip/gas ratio. pub struct RatioTipGasSelection where S: RatioTipGasSelectionAlgorithmStorage, { executable_transactions_sorted_tip_gas_ratio: BTreeMap, S::StorageIndex>, + executable_transactions_by_contract: + HashMap, S::StorageIndex>>, new_executable_txs_notifier: tokio::sync::watch::Sender<()>, + eagerly_include_tx_dependency_graphs: bool, + last_selection_anchors: Vec, } impl RatioTipGasSelection where S: RatioTipGasSelectionAlgorithmStorage, { - pub fn new(new_executable_txs_notifier: tokio::sync::watch::Sender<()>) -> Self { + pub fn new( + new_executable_txs_notifier: tokio::sync::watch::Sender<()>, + eagerly_include_tx_dependency_graphs: bool, + ) -> Self { Self { executable_transactions_sorted_tip_gas_ratio: BTreeMap::new(), + executable_transactions_by_contract: HashMap::new(), new_executable_txs_notifier, + eagerly_include_tx_dependency_graphs, + last_selection_anchors: Vec::new(), + } + } + + fn push_selected_anchor( + selected_anchors: &mut Vec, + contract_id: ContractId, + ) { + if !selected_anchors.contains(&contract_id) { + selected_anchors.push(contract_id); } } @@ -112,9 +210,351 @@ where } } - fn on_removed_transaction_inner(&mut self, key: Key) { + fn remove_key_from_indexes( + &mut self, + key: Key, + contract_ids: &[ContractId], + ) { self.executable_transactions_sorted_tip_gas_ratio .remove(&Reverse(key)); + contract_ids + .iter() + .for_each(|contract_id| self.remove_key_from_contract_index(*contract_id, key)); + } + + fn remove_key_from_contract_index( + &mut self, + contract_id: ContractId, + key: Key, + ) { + if let Some(contract_index) = + self.executable_transactions_by_contract.get_mut(&contract_id) + { + contract_index.remove(&Reverse(key)); + if contract_index.is_empty() { + self.executable_transactions_by_contract.remove(&contract_id); + } + } + } + + fn insert_key_into_indexes( + &mut self, + key: Key, + storage_id: S::StorageIndex, + contract_ids: &[ContractId], + ) { + self.executable_transactions_sorted_tip_gas_ratio + .insert(Reverse(key), storage_id); + contract_ids.iter().for_each(|contract_id| { + self.insert_key_into_contract_index(*contract_id, key, storage_id) + }); + } + + fn insert_key_into_contract_index( + &mut self, + contract_id: ContractId, + key: Key, + storage_id: S::StorageIndex, + ) { + self.executable_transactions_by_contract + .entry(contract_id) + .or_default() + .insert(Reverse(key), storage_id); + } + + fn collect_contract_inputs(store_entry: &StorageData) -> Vec { + let mut contracts = HashSet::new(); + store_entry.transaction.inputs().iter().for_each(|input| { + if let Input::Contract(contract_input) = input { + contracts.insert(contract_input.contract_id); + } + }); + contracts.into_iter().collect() + } + + fn batch_graphs_count( + executable_count: usize, + execution_worker_count: usize, + ) -> usize { + if execution_worker_count == 0 { + return 0; + } + (executable_count + .saturating_add(execution_worker_count) + .saturating_sub(1)) + / execution_worker_count + } + + fn try_enqueue_stored_transaction( + &self, + constraints: &Constraints, + stored_transaction: &StorageData, + storage_id: S::StorageIndex, + prioritized_queue: &mut VecDeque, + budget: &mut SelectionBudget, + skipped: &mut SkipCounters, + ) -> Option { + if !budget.has_capacity() { + return None; + } + + let has_excluded_contract = + stored_transaction.transaction.inputs().iter().any(|input| { + if let fuel_core_types::fuel_tx::Input::Contract(contract) = input { + constraints + .excluded_contracts + .contains(&contract.contract_id) + } else { + false + } + }); + if has_excluded_contract { + skipped.excluded_contracts = skipped.excluded_contracts.saturating_add(1); + return None; + } + + if stored_transaction.transaction.max_gas_price() < constraints.minimal_gas_price + { + skipped.less_price = skipped.less_price.saturating_add(1); + return None; + } + + let tx_gas = stored_transaction.transaction.max_gas(); + if tx_gas > budget.gas_left { + skipped.not_enough_gas = skipped.not_enough_gas.saturating_add(1); + return None; + } + + if stored_transaction.transaction.metered_bytes_size() as u64 > budget.space_left + { + skipped.too_big_tx = skipped.too_big_tx.saturating_add(1); + return None; + } + + budget.gas_left = budget.gas_left.saturating_sub(tx_gas); + budget.space_left = budget + .space_left + .saturating_sub(stored_transaction.transaction.metered_bytes_size() as u64); + budget.nb_left = budget.nb_left.saturating_sub(1); + prioritized_queue.push_back(storage_id); + Some(tx_gas) + } + + fn fill_from_executable_set( + &mut self, + constraints: &Constraints, + storage: &S, + queue_limit: usize, + prioritized_queue: &mut VecDeque, + budget: &mut SelectionBudget, + skipped: &mut SkipCounters, + anchor_state: Option<&mut AnchorState>, + ) -> bool { + let mut anchor_state = anchor_state; + let mut selected_transactions = Vec::new(); + for (key, storage_id) in &self.executable_transactions_sorted_tip_gas_ratio { + if prioritized_queue.len() >= queue_limit || !budget.has_capacity() { + break; + } + + let Some(stored_transaction) = storage.get(storage_id) else { + debug_assert!( + false, + "Transaction not found in the storage during `gather_best_txs`." + ); + tracing::warn!( + "Transaction not found in the storage during `gather_best_txs`." + ); + selected_transactions.push((key.0, Vec::new())); + continue; + }; + + if let Some(gas) = self.try_enqueue_stored_transaction( + constraints, + stored_transaction, + *storage_id, + prioritized_queue, + budget, + skipped, + ) { + if let Some(state) = anchor_state.as_mut() { + state.on_selected( + self.select_anchor_contract(constraints, stored_transaction), + gas, + ); + } + selected_transactions + .push((key.0, Self::collect_contract_inputs(stored_transaction))); + } + } + + if selected_transactions.is_empty() { + return false; + } + + selected_transactions + .into_iter() + .for_each(|(key, contracts)| self.remove_key_from_indexes(key, &contracts)); + true + } + + fn fill_from_anchor_contract_set( + &mut self, + constraints: &Constraints, + storage: &S, + queue_limit: usize, + anchor_contract_id: ContractId, + prioritized_queue: &mut VecDeque, + budget: &mut SelectionBudget, + skipped: &mut SkipCounters, + anchor_state: Option<&mut AnchorState>, + ) -> bool { + let mut anchor_state = anchor_state; + let candidates = self + .executable_transactions_by_contract + .get(&anchor_contract_id) + .map(|entries| { + entries + .iter() + .map(|(key, storage_id)| (key.0, *storage_id)) + .collect::>() + }) + .unwrap_or_default(); + + if candidates.is_empty() { + return false; + } + + let mut selected_transactions = Vec::new(); + let mut stale_keys = Vec::new(); + for (key, storage_id) in candidates { + if prioritized_queue.len() >= queue_limit || !budget.has_capacity() { + break; + } + + let Some(stored_transaction) = storage.get(&storage_id) else { + stale_keys.push(key); + continue; + }; + + if let Some(gas) = self.try_enqueue_stored_transaction( + constraints, + stored_transaction, + storage_id, + prioritized_queue, + budget, + skipped, + ) { + if let Some(state) = anchor_state.as_mut() { + state.on_selected(Some(anchor_contract_id), gas); + } + selected_transactions + .push((key, Self::collect_contract_inputs(stored_transaction))); + } + } + + if selected_transactions.is_empty() && stale_keys.is_empty() { + return false; + } + + stale_keys.into_iter().for_each(|key| { + self.executable_transactions_sorted_tip_gas_ratio + .remove(&Reverse(key)); + self.remove_key_from_contract_index(anchor_contract_id, key); + }); + selected_transactions + .into_iter() + .for_each(|(key, contracts)| self.remove_key_from_indexes(key, &contracts)); + true + } + + fn fill_from_promoted_queue( + &mut self, + constraints: &Constraints, + storage: &S, + queue_limit: usize, + prioritized_queue: &mut VecDeque, + promoted_queue: &mut VecDeque, + budget: &mut SelectionBudget, + skipped: &mut SkipCounters, + anchor_state: Option<&mut AnchorState>, + ) -> bool { + let mut anchor_state = anchor_state; + let mut filled = false; + while budget.has_capacity() && prioritized_queue.len() < queue_limit { + let Some(storage_id) = promoted_queue.pop_front() else { + break; + }; + + let Some(stored_transaction) = storage.get(&storage_id) else { + debug_assert!( + false, + "Transaction not found in the storage during `gather_best_txs`." + ); + tracing::warn!( + "Transaction not found in the storage during `gather_best_txs`." + ); + continue; + }; + + if let Some(gas) = self.try_enqueue_stored_transaction( + constraints, + stored_transaction, + storage_id, + prioritized_queue, + budget, + skipped, + ) { + if let Some(state) = anchor_state.as_mut() { + state.on_selected( + self.select_anchor_contract(constraints, stored_transaction), + gas, + ); + } + let key = Self::key(stored_transaction); + self.remove_key_from_indexes( + key, + &Self::collect_contract_inputs(stored_transaction), + ); + filled = true; + } + } + filled + } + + fn contract_anchor_bias_enabled() -> bool { + std::env::var("TXPOOL_V2_CONTRACT_ANCHOR_BIAS") + .map(|value| { + matches!( + value.to_ascii_lowercase().as_str(), + "1" | "true" | "yes" | "on" + ) + }) + .unwrap_or(false) + } + + fn contract_anchor_dominance_threshold_pct() -> u8 { + std::env::var("TXPOOL_V2_CONTRACT_ANCHOR_DOMINANCE_THRESHOLD_PCT") + .ok() + .and_then(|value| value.parse::().ok()) + .filter(|value| *value > 0 && *value <= 100) + .unwrap_or(50) + } + + fn select_anchor_contract( + &self, + constraints: &Constraints, + store_entry: &StorageData, + ) -> Option { + Self::collect_contract_inputs(store_entry) + .into_iter() + .filter(|contract_id| !constraints.excluded_contracts.contains(contract_id)) + .max_by_key(|contract_id| { + self.executable_transactions_by_contract + .get(contract_id) + .map(BTreeMap::len) + .unwrap_or(0) + }) } #[cfg(test)] @@ -151,127 +591,199 @@ where constraints: &Constraints, storage: &mut S, ) -> RemovedTransactions { - let mut gas_left = constraints.max_gas; - let mut space_left = constraints.maximum_block_size; - let mut nb_left = constraints.maximum_txs; let mut result = Vec::new(); + let mut selected_anchors = Vec::new(); + let execution_worker_count = constraints.execution_worker_count.max(1); + let batch_graphs_count = Self::batch_graphs_count( + self.number_of_executable_transactions(), + execution_worker_count, + ); - // Take iterate over all transactions with the highest tip/gas ratio. If transaction - // fits in the gas limit select it and mark all its dependents to be promoted. - // Do that until end of the list or gas limit is reached. If gas limit is not - // reached, but we have promoted transactions we can start again from the beginning. - // Otherwise, we can break the loop. - // It is done in this way to minimize number of iteration of the list of executable - // transactions. + let mut budget = SelectionBudget::new(constraints); + let mut skipped = SkipCounters::default(); let mut add_new_executable = false; - while gas_left > 0 - && nb_left > 0 - && space_left > 0 - && !self.executable_transactions_sorted_tip_gas_ratio.is_empty() - { - let mut clean_up_list = Vec::new(); - let mut transactions_to_remove = Vec::new(); - let mut transactions_to_promote = Vec::new(); - - 'outer: for (key, storage_id) in - &self.executable_transactions_sorted_tip_gas_ratio - { - if nb_left == 0 || gas_left == 0 || space_left == 0 { - break; - } + let mut prioritized_queue = VecDeque::with_capacity(batch_graphs_count); + let mut promoted_queue = VecDeque::new(); + let contract_anchor_bias_enabled = Self::contract_anchor_bias_enabled(); + let contract_anchor_dominance_threshold_pct = + Self::contract_anchor_dominance_threshold_pct(); + let mut anchor_state = contract_anchor_bias_enabled + .then(|| AnchorState::new(contract_anchor_dominance_threshold_pct)); + let initial_queue_limit = if contract_anchor_bias_enabled { + 1 + } else { + batch_graphs_count + }; + + self.fill_from_executable_set( + constraints, + storage, + initial_queue_limit, + &mut prioritized_queue, + &mut budget, + &mut skipped, + anchor_state.as_mut(), + ); + let mut anchor_contract_id = if contract_anchor_bias_enabled { + prioritized_queue + .front() + .and_then(|storage_id| storage.get(storage_id)) + .and_then(|store_entry| { + self.select_anchor_contract(constraints, store_entry) + }) + } else { + None + }; + if let Some(anchor_contract) = anchor_contract_id { + Self::push_selected_anchor(&mut selected_anchors, anchor_contract); + } - let Some(stored_transaction) = storage.get(storage_id) else { - debug_assert!( - false, - "Transaction not found in the storage during `gather_best_txs`." - ); - tracing::warn!( - "Transaction not found in the storage during `gather_best_txs`." + loop { + if budget.has_capacity() { + let locked_anchor = anchor_state + .as_ref() + .and_then(|state| state.locked_anchor); + if let Some(locked_anchor) = locked_anchor { + Self::push_selected_anchor(&mut selected_anchors, locked_anchor); + let filled_from_locked_anchor = self.fill_from_anchor_contract_set( + constraints, + storage, + batch_graphs_count, + locked_anchor, + &mut prioritized_queue, + &mut budget, + &mut skipped, + anchor_state.as_mut(), ); - transactions_to_remove.push(*key); - continue - }; - - for input in stored_transaction.transaction.inputs() { - if let fuel_core_types::fuel_tx::Input::Contract(contract) = input - && constraints - .excluded_contracts - .contains(&contract.contract_id) - { - continue 'outer; + if !filled_from_locked_anchor && prioritized_queue.is_empty() { + break; } } - - let less_price = stored_transaction.transaction.max_gas_price() - < constraints.minimal_gas_price; - - if less_price { - continue - } - - let not_enough_gas = stored_transaction.transaction.max_gas() > gas_left; - let too_big_tx = stored_transaction.transaction.metered_bytes_size() - as u64 - > space_left; - - if not_enough_gas || too_big_tx { - continue - } - - gas_left = - gas_left.saturating_sub(stored_transaction.transaction.max_gas()); - space_left = space_left.saturating_sub( - stored_transaction.transaction.metered_bytes_size() as u64, + let filled_from_promoted = self.fill_from_promoted_queue( + constraints, + storage, + batch_graphs_count, + &mut prioritized_queue, + &mut promoted_queue, + &mut budget, + &mut skipped, + anchor_state.as_mut(), ); - nb_left = nb_left.saturating_sub(1); - - let dependents = storage.get_dependents(storage_id).collect::>(); - debug_assert!(!storage.has_dependencies(storage_id)); - let removed = storage.remove(storage_id).expect( - "We just get the transaction from the storage above, it should exist.", - ); - clean_up_list.push(*key); - result.push(removed); + let filled_from_anchor = anchor_contract_id + .filter(|_| prioritized_queue.len() < batch_graphs_count) + .map(|anchor_contract| { + Self::push_selected_anchor( + &mut selected_anchors, + anchor_contract, + ); + self.fill_from_anchor_contract_set( + constraints, + storage, + batch_graphs_count, + anchor_contract, + &mut prioritized_queue, + &mut budget, + &mut skipped, + anchor_state.as_mut(), + ) + }) + .unwrap_or(false); + let should_fill_from_executable = + prioritized_queue.len() < batch_graphs_count + && self.eagerly_include_tx_dependency_graphs; + let filled_from_executable = should_fill_from_executable + && self.fill_from_executable_set( + constraints, + storage, + batch_graphs_count, + &mut prioritized_queue, + &mut budget, + &mut skipped, + anchor_state.as_mut(), + ); - for dependent in dependents { - if !storage.has_dependencies(&dependent) { - transactions_to_promote.push(dependent); - } + if prioritized_queue.is_empty() + && !filled_from_promoted + && !filled_from_anchor + && !filled_from_executable + { + break; } } - for remove in transactions_to_remove { - let key = remove.0; - self.on_removed_transaction_inner(key); - } - - // If no transaction fits in the gas limit and no one to promote, we can break the loop - if clean_up_list.is_empty() && transactions_to_promote.is_empty() { + if prioritized_queue.is_empty() { break; } - for key in clean_up_list { - let key = key.0; - // Remove selected transactions from the sorted list - self.on_removed_transaction_inner(key); - } + let storage_id = prioritized_queue + .pop_front() + .expect("Checked for emptiness above."); - if transactions_to_promote.is_empty() { - continue; - } - for promote in transactions_to_promote { - let storage = storage.get(&promote).expect( - "We just get the dependent from the storage, it should exist.", + let Some(_stored_transaction) = storage.get(&storage_id) else { + debug_assert!( + false, + "Transaction not found in the storage during `gather_best_txs`." + ); + tracing::warn!( + "Transaction not found in the storage during `gather_best_txs`." ); - self.new_executable_transaction(promote, storage); + continue; + }; + + let dependents = storage.get_dependents(&storage_id).collect::>(); + debug_assert!(!storage.has_dependencies(&storage_id)); + let removed = storage.remove(&storage_id).expect( + "We just get the transaction from the storage above, it should exist.", + ); + result.push(removed); + + for dependent in dependents { + if storage.has_dependencies(&dependent) { + continue; + } + let Some(store_entry) = storage.get(&dependent) else { + debug_assert!( + false, + "Dependent transaction not found in the storage during `gather_best_txs`." + ); + tracing::warn!( + "Dependent transaction not found in the storage during `gather_best_txs`." + ); + continue; + }; + self.new_executable_transaction(dependent, store_entry); + promoted_queue.push_back(dependent); + if anchor_contract_id.is_none() { + anchor_contract_id = + self.select_anchor_contract(constraints, store_entry); + } + add_new_executable = true; } - add_new_executable = true; } if add_new_executable { self.new_executable_txs_notifier.send_replace(()); } + self.last_selection_anchors = selected_anchors; + + tracing::warn!( + batch_graphs_count, + execution_worker_count, + prioritized_queue_len = prioritized_queue.len(), + promoted_queue_len = promoted_queue.len(), + selected_count = result.len(), + skipped_not_enough_gas = skipped.not_enough_gas, + skipped_too_big_tx = skipped.too_big_tx, + skipped_less_price = skipped.less_price, + skipped_excluded_contracts = skipped.excluded_contracts, + contract_anchor_bias_enabled, + contract_anchor_dominance_threshold_pct, + anchor_contract = ?anchor_contract_id, + locked_anchor = ?anchor_state.as_ref().and_then(|state| state.locked_anchor), + "txpool_v2 gather_best_txs summary" + ); + result } @@ -281,8 +793,17 @@ where store_entry: &StorageData, ) { let key = Self::key(store_entry); - self.executable_transactions_sorted_tip_gas_ratio - .insert(Reverse(key), storage_id); + self.insert_key_into_indexes( + key, + storage_id, + &Self::collect_contract_inputs(store_entry), + ); + // tracing::warn!( + // executable_set_size = self + // .executable_transactions_sorted_tip_gas_ratio + // .len(), + // "txpool_v2 executable added" + // ); } fn get_less_worth_txs(&self) -> impl Iterator { @@ -291,9 +812,22 @@ where .rev() } + fn last_selection_anchors(&self) -> &[ContractId] { + &self.last_selection_anchors + } + fn on_removed_transaction(&mut self, storage_entry: &StorageData) { let key = Self::key(storage_entry); - self.on_removed_transaction_inner(key) + self.remove_key_from_indexes( + key, + &Self::collect_contract_inputs(storage_entry), + ); + // tracing::warn!( + // executable_set_size = self + // .executable_transactions_sorted_tip_gas_ratio + // .len(), + // "txpool_v2 executable removed" + // ); } fn number_of_executable_transactions(&self) -> usize { diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index f70ebecd745..9223640db3a 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -797,7 +797,10 @@ where max_txs_chain_count: config.max_txs_chain_count, }), BasicCollisionManager::new(), - RatioTipGasSelection::new(new_txs_notifier.clone()), + RatioTipGasSelection::new( + new_txs_notifier.clone(), + config.eagerly_include_tx_dependency_graphs, + ), config, pool_stats_sender, new_txs_notifier.clone(), diff --git a/crates/services/txpool_v2/src/shared_state.rs b/crates/services/txpool_v2/src/shared_state.rs index d603e68d560..c8e05ebc668 100644 --- a/crates/services/txpool_v2/src/shared_state.rs +++ b/crates/services/txpool_v2/src/shared_state.rs @@ -93,7 +93,7 @@ impl SharedState { loop { let result = select_transactions_receiver.try_recv(); match result { - Ok((txs, _)) => { + Ok((txs, _, _)) => { return Ok(txs); } Err(TryRecvError::Empty) => continue, @@ -107,7 +107,7 @@ impl SharedState { pub async fn extract_transactions_for_block_async( &self, constraints: Constraints, - ) -> Result<(Vec, HashSet), Error> { + ) -> Result<(Vec, HashSet, Vec), Error> { let (select_transactions_sender, select_transactions_receiver) = oneshot::channel(); self.select_transactions_requests_sender diff --git a/crates/services/txpool_v2/src/tests/context.rs b/crates/services/txpool_v2/src/tests/context.rs index 0e2bc1711f2..c7182e33442 100644 --- a/crates/services/txpool_v2/src/tests/context.rs +++ b/crates/services/txpool_v2/src/tests/context.rs @@ -112,13 +112,17 @@ impl TestPoolUniverse { } pub fn build_pool(&mut self) -> TxPool { + let (tx_new_executable_txs, _) = tokio::sync::watch::channel(()); let pool = Arc::new(RwLock::new(Pool::new( MockDBProvider(self.mock_db.clone()), GraphStorage::new(GraphConfig { max_txs_chain_count: self.config.max_txs_chain_count, }), BasicCollisionManager::new(), - RatioTipGasSelection::new(), + RatioTipGasSelection::new( + tx_new_executable_txs, + self.config.eagerly_include_tx_dependency_graphs, + ), self.config.clone(), ))); self.pool = Some(pool.clone()); diff --git a/crates/services/txpool_v2/src/tests/stability_test.rs b/crates/services/txpool_v2/src/tests/stability_test.rs index 2a2bb752121..583beb24ecc 100644 --- a/crates/services/txpool_v2/src/tests/stability_test.rs +++ b/crates/services/txpool_v2/src/tests/stability_test.rs @@ -187,6 +187,7 @@ fn stability_test_with_seed(seed: u64, limits: Limits, config: Config) { maximum_block_size: u64::MAX, minimal_gas_price: 0, excluded_contracts: Default::default(), + execution_worker_count: 1, }); if result.is_empty() { diff --git a/crates/services/txpool_v2/src/tests/tests_pool.rs b/crates/services/txpool_v2/src/tests/tests_pool.rs index e6799d5771a..740b61c1383 100644 --- a/crates/services/txpool_v2/src/tests/tests_pool.rs +++ b/crates/services/txpool_v2/src/tests/tests_pool.rs @@ -646,6 +646,7 @@ fn get_sorted_out_tx1_2_3() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }); // Then @@ -704,6 +705,7 @@ fn get_sorted_out_tx_same_tips() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }); // Then @@ -762,6 +764,7 @@ fn get_sorted_out_zero_tip() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }); // Then @@ -820,6 +823,7 @@ fn get_sorted_out_tx_profitable_ratios() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }); // Then @@ -860,6 +864,7 @@ fn get_sorted_out_tx_by_creation_instant() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }); // Then @@ -1300,6 +1305,7 @@ fn verify_and_insert__when_dependent_tx_is_extracted_new_tx_still_accepted() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }); assert_eq!(txs.len(), 1); assert_eq!(pool_dependency_tx.id(), txs[0].id()); @@ -1510,6 +1516,7 @@ fn extract__tx_with_excluded_contract() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts, + execution_worker_count: 1, }); // Then diff --git a/crates/services/txpool_v2/src/tests/tests_service.rs b/crates/services/txpool_v2/src/tests/tests_service.rs index 05672400755..79e1597138e 100644 --- a/crates/services/txpool_v2/src/tests/tests_service.rs +++ b/crates/services/txpool_v2/src/tests/tests_service.rs @@ -498,6 +498,7 @@ async fn insert__tx_depends_one_extracted_and_one_pool_tx() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }) .unwrap(); @@ -513,6 +514,7 @@ async fn insert__tx_depends_one_extracted_and_one_pool_tx() { maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }) .unwrap(); @@ -553,6 +555,7 @@ async fn pending_pool__returns_error_for_transaction_that_spends_already_spent_u maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }) .unwrap(); @@ -601,6 +604,7 @@ async fn pending_pool__returns_error_after_timeout_for_transaction_that_spends_u maximum_txs: u32::MAX, maximum_block_size: u64::MAX, excluded_contracts: Default::default(), + execution_worker_count: 1, }) .unwrap(); diff --git a/crates/services/txpool_v2/src/tests/universe.rs b/crates/services/txpool_v2/src/tests/universe.rs index 17212616089..38b2f262244 100644 --- a/crates/services/txpool_v2/src/tests/universe.rs +++ b/crates/services/txpool_v2/src/tests/universe.rs @@ -192,7 +192,10 @@ impl TestPoolUniverse { max_txs_chain_count: self.config.max_txs_chain_count, }), BasicCollisionManager::new(), - RatioTipGasSelection::new(tx_new_executable_txs.clone()), + RatioTipGasSelection::new( + tx_new_executable_txs.clone(), + self.config.eagerly_include_tx_dependency_graphs, + ), self.config.clone(), tx, tx_new_executable_txs, diff --git a/crates/services/upgradable-executor/wasm-executor/src/main.rs b/crates/services/upgradable-executor/wasm-executor/src/main.rs index acdd541a50a..2389035a4ef 100644 --- a/crates/services/upgradable-executor/wasm-executor/src/main.rs +++ b/crates/services/upgradable-executor/wasm-executor/src/main.rs @@ -13,12 +13,6 @@ #![deny(unused_crate_dependencies)] #![deny(warnings)] -use crate as fuel_core_wasm_executor; -use crate::utils::{ - InputDeserializationType, - WasmDeserializationBlockTypes, - convert_to_v1_execution_result, -}; use fuel_core_executor::executor::{ ExecutionInstance, ExecutionOptions, diff --git a/tests/test-helpers/src/builder.rs b/tests/test-helpers/src/builder.rs index 4301f320290..0a9a3c40b71 100644 --- a/tests/test-helpers/src/builder.rs +++ b/tests/test-helpers/src/builder.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "parallel-executor")] +use fuel_core::service::config::ParallelExecutorWorkerCountPolicy; use fuel_core::{ chain_config::{ ChainConfig, @@ -12,7 +14,10 @@ use fuel_core::{ Config, DbType, FuelService, - config::GasPriceConfig, + config::{ + ExecutorMode, + GasPriceConfig, + }, }, state::rocks_db::DatabaseConfig, }; @@ -43,6 +48,7 @@ use std::num::NonZeroUsize; use std::{ collections::HashMap, io, + path::PathBuf, }; /// Helper for wrapping a currently running node environment @@ -104,10 +110,21 @@ pub struct TestSetupBuilder { pub base_asset_id: AssetId, pub trigger: Trigger, pub max_txs: usize, + pub txpool_max_gas: Option, pub database_type: DbType, pub database_config: DatabaseConfig, + pub database_path: Option, pub chain_config: Option, pub number_threads_pool_verif: usize, + pub txpool_verification_threads: usize, + pub txpool_verification_queue_size: usize, + pub txpool_p2p_sync_threads: usize, + pub txpool_p2p_sync_queue_size: usize, + pub executor_parallel_worker_count: usize, + #[cfg(feature = "parallel-executor")] + pub executor_parallel_worker_count_policy: ParallelExecutorWorkerCountPolicy, + pub executor_mode: ExecutorMode, + pub executor_metrics: bool, } impl TestSetupBuilder { @@ -115,6 +132,11 @@ impl TestSetupBuilder { Self { rng: StdRng::seed_from_u64(seed), number_threads_pool_verif: 0, + txpool_verification_threads: 0, + txpool_verification_queue_size: 0, + txpool_p2p_sync_threads: 0, + txpool_p2p_sync_queue_size: 100, + executor_parallel_worker_count: 0, ..Default::default() } } @@ -124,6 +146,11 @@ impl TestSetupBuilder { self } + pub fn set_database_path(&mut self, database_path: impl Into) -> &mut Self { + self.database_path = Some(database_path.into()); + self + } + /// setup a contract and add to genesis configuration pub fn setup_contract( &mut self, @@ -247,13 +274,34 @@ impl TestSetupBuilder { let mut txpool = fuel_core_txpool::config::Config::default(); txpool.pool_limits.max_txs = self.max_txs; + if let Some(max_gas) = self.txpool_max_gas { + txpool.pool_limits.max_gas = max_gas; + } txpool.service_channel_limits = fuel_core_txpool::config::ServiceChannelLimits { max_pending_write_pool_requests: self.max_txs, max_pending_read_pool_requests: self.max_txs, }; - txpool.heavy_work.size_of_verification_queue = self.max_txs; + let txpool_verification_queue_size = if self.txpool_verification_queue_size == 0 { + self.max_txs + } else { + self.txpool_verification_queue_size + }; + txpool.heavy_work.size_of_verification_queue = txpool_verification_queue_size; + let txpool_verification_threads = if self.txpool_verification_threads == 0 { + self.number_threads_pool_verif + } else { + self.txpool_verification_threads + }; + let executor_parallel_worker_count = if self.executor_parallel_worker_count == 0 { + self.number_threads_pool_verif + } else { + self.executor_parallel_worker_count + }; + txpool.heavy_work.number_threads_to_verify_transactions = - self.number_threads_pool_verif; + txpool_verification_threads; + txpool.heavy_work.number_threads_p2p_sync = self.txpool_p2p_sync_threads; + txpool.heavy_work.size_of_p2p_sync_queue = self.txpool_p2p_sync_queue_size; txpool.utxo_validation = self.utxo_validation; let gas_price_config = GasPriceConfig { @@ -261,18 +309,24 @@ impl TestSetupBuilder { ..GasPriceConfig::local_node() }; - let mut config = Config { - utxo_validation: self.utxo_validation, - txpool, - block_production: self.trigger, - gas_price_config, - #[cfg(feature = "parallel-executor")] - executor_number_of_cores: NonZeroUsize::try_from( - self.number_threads_pool_verif, - ) - .unwrap_or(NonZeroUsize::try_from(1).expect("1 is not 0")), - ..Config::local_node_with_configs(chain_conf, state) - }; + let mut config = Config::local_node_with_configs(chain_conf, state); + if let Some(database_path) = self.database_path.clone() { + config.combined_db_config.database_path = database_path; + } + config.utxo_validation = self.utxo_validation; + config.txpool = txpool; + config.block_production = self.trigger; + config.gas_price_config = gas_price_config; + config.executor.mode = self.executor_mode; + #[cfg(feature = "parallel-executor")] + { + config.executor.parallel.worker_count = + NonZeroUsize::try_from(executor_parallel_worker_count) + .unwrap_or(NonZeroUsize::try_from(1).expect("1 is not 0")); + config.executor.parallel.worker_count_policy = + self.executor_parallel_worker_count_policy; + config.executor.parallel.metrics = self.executor_metrics; + } config.combined_db_config.database_config = self.database_config; let srv = FuelService::new_node(config).await.unwrap(); @@ -301,10 +355,22 @@ impl Default for TestSetupBuilder { base_asset_id: AssetId::BASE, trigger: Trigger::Instant, max_txs: 100000, + txpool_max_gas: None, database_type: DbType::RocksDb, database_config: DatabaseConfig::config_for_tests(), + database_path: None, chain_config: None, number_threads_pool_verif: 0, + txpool_verification_threads: 0, + txpool_verification_queue_size: 0, + txpool_p2p_sync_threads: 0, + txpool_p2p_sync_queue_size: 100, + executor_parallel_worker_count: 0, + #[cfg(feature = "parallel-executor")] + executor_parallel_worker_count_policy: + ParallelExecutorWorkerCountPolicy::StaticMax, + executor_mode: ExecutorMode::Normal, + executor_metrics: false, } } }