diff --git a/.changes/fixed/3268.md b/.changes/fixed/3268.md new file mode 100644 index 00000000000..6464db40c23 --- /dev/null +++ b/.changes/fixed/3268.md @@ -0,0 +1 @@ +Mempool preconfs: split collision manager removal into committed vs evicted paths to defer contract_users cleanup until after rollback diff --git a/Cargo.lock b/Cargo.lock index 01bda8a3c71..bcfc6bcc367 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10896,7 +10896,6 @@ dependencies = [ "fuel-core", "fuel-core-bin", "fuel-core-client", - "fuel-core-executor", "fuel-core-p2p", "fuel-core-poa", "fuel-core-relayer", diff --git a/crates/services/txpool_v2/src/collision_manager/basic.rs b/crates/services/txpool_v2/src/collision_manager/basic.rs index 3c31d881e32..7ec9ad7b2fb 100644 --- a/crates/services/txpool_v2/src/collision_manager/basic.rs +++ b/crates/services/txpool_v2/src/collision_manager/basic.rs @@ -329,33 +329,31 @@ where } fn on_removed_transaction(&mut self, transaction: &PoolTransaction) { + self.on_committed_transaction(transaction); + self.remove_from_contract_users(transaction); + } + + fn on_committed_transaction(&mut self, transaction: &PoolTransaction) { if let PoolTransaction::Blob(checked_tx, _) = transaction { let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.remove(blob_id); } - let tx_id = transaction.id(); for input in transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) | Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => { - // remove coin self.coins_spenders.remove(utxo_id); } Input::MessageCoinSigned(MessageCoinSigned { nonce, .. }) | Input::MessageCoinPredicate(MessageCoinPredicate { nonce, .. }) | Input::MessageDataSigned(MessageDataSigned { nonce, .. }) | Input::MessageDataPredicate(MessageDataPredicate { nonce, .. }) => { - // remove message self.messages_spenders.remove(nonce); } - Input::Contract(ContractInput { contract_id, .. }) => { - if let Some(users) = self.contract_users.get_mut(contract_id) { - users.retain(|id| id != &tx_id); - if users.is_empty() { - self.contract_users.remove(contract_id); - } - } - } + // Deliberately skipped: contract_users cleanup is deferred to + // remove_from_contract_users, which the caller invokes after + // all preconfirmation rollback for the block is complete. + Input::Contract(_) => {} } } for output in transaction.outputs().iter() { @@ -365,10 +363,23 @@ where | Output::Variable { .. } | Output::Contract(_) => {} Output::ContractCreated { contract_id, .. } => { - // remove contract self.contracts_creators.remove(contract_id); } }; } } + + fn remove_from_contract_users(&mut self, transaction: &PoolTransaction) { + let tx_id = transaction.id(); + for input in transaction.inputs() { + if let Input::Contract(ContractInput { contract_id, .. }) = input + && let Some(users) = self.contract_users.get_mut(contract_id) + { + users.retain(|id| id != &tx_id); + if users.is_empty() { + self.contract_users.remove(contract_id); + } + } + } + } } diff --git a/crates/services/txpool_v2/src/collision_manager/mod.rs b/crates/services/txpool_v2/src/collision_manager/mod.rs index d053b70cad4..341d5ef0ee7 100644 --- a/crates/services/txpool_v2/src/collision_manager/mod.rs +++ b/crates/services/txpool_v2/src/collision_manager/mod.rs @@ -46,6 +46,23 @@ pub trait CollisionManager { store_entry: &StorageData, ); - /// Inform the collision manager that a transaction was removed. + /// Inform the collision manager that a transaction was removed from the pool + /// for any reason other than canonical-block commitment (e.g. eviction, TTL + /// expiry, rollback-driven eviction). Cleans up all tracking state including + /// the `contract_users` entry for this transaction. fn on_removed_transaction(&mut self, transaction: &PoolTransaction); + + /// Inform the collision manager that a transaction was committed to a + /// canonical block and has been removed from the pool. Like + /// `on_removed_transaction` but intentionally **skips** the removal of + /// this transaction's ID from `contract_users`. The caller is responsible + /// for invoking `remove_from_contract_users` later — after any + /// preconfirmation-rollback work is complete — so that rollback logic can + /// still observe the full set of contract dependents. + fn on_committed_transaction(&mut self, transaction: &PoolTransaction); + + /// Remove the transaction's ID from every `contract_users` entry it + /// contributed to. Call this once per committed transaction, after all + /// rollback logic for the relevant block height has finished. + fn remove_from_contract_users(&mut self, transaction: &PoolTransaction); } diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index 5b45be4008f..b6bc06a79f2 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -429,7 +429,13 @@ where /// Process a preconfirmed transaction as committed while recording its spent /// inputs so they can be rolled back later if the transaction is not /// included in the canonical block. - pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) { + /// + /// Returns the pool transaction if it was present in the pool (so that the + /// caller can defer the `contract_users` cleanup until after rollback). + pub fn process_preconfirmed_committed_transaction( + &mut self, + tx_id: TxId, + ) -> Option { // If the tx was already extracted for local block production it is no // longer in `tx_id_to_storage_id`, so the branch below that calls // `record_tentative_spend` will be skipped. Preserve the input keys @@ -445,7 +451,7 @@ where // Invariant violation. Panic in tests, log in production. debug_assert!(false, "Storage data not found for the transaction"); tracing::warn!("Storage data not found for the transaction."); - return; + return None; }; self.extracted_outputs .new_extracted_transaction(&transaction.transaction); @@ -455,7 +461,8 @@ where .record_tentative_spend(tx_id, transaction.transaction.inputs()); self.spent_inputs .spend_inputs(tx_id, transaction.transaction.inputs()); - self.update_components_and_caches_on_removal(iter::once(&transaction)); + let pool_tx = transaction.transaction.clone(); + self.update_components_and_caches_on_committed(iter::once(&transaction)); let mut new_executable_transaction = false; for dependent in dependents { @@ -470,16 +477,28 @@ where if new_executable_transaction { self.new_executable_txs_notifier.send_replace(()); } + + self.update_stats(); + return Some(pool_tx); } self.update_stats(); + None } /// Process committed transactions: /// - Remove transaction but keep its dependents and the dependents become executables. /// - Notify about possible new executable transactions. - pub fn process_committed_transactions(&mut self, tx_ids: impl Iterator) { + /// + /// Returns the pool transactions that were actually present in the pool (and + /// thus removed). The caller must invoke `remove_from_contract_users` for + /// each returned transaction once all preconfirmation rollback is complete. + pub fn process_committed_transactions( + &mut self, + tx_ids: impl Iterator, + ) -> Vec { let mut transactions_to_promote = vec![]; + let mut committed_pool_txs = vec![]; for tx_id in tx_ids { self.spent_inputs.spend_inputs_by_tx_id(tx_id); if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { @@ -496,7 +515,8 @@ where .new_extracted_transaction(&transaction.transaction); self.spent_inputs .spend_inputs(tx_id, transaction.transaction.inputs()); - self.update_components_and_caches_on_removal(iter::once(&transaction)); + committed_pool_txs.push(transaction.transaction.clone()); + self.update_components_and_caches_on_committed(iter::once(&transaction)); for dependent in dependents { if !self.storage.has_dependencies(&dependent) { @@ -528,6 +548,7 @@ where } self.update_stats(); + committed_pool_txs } /// Check if the pool has enough space to store a transaction. @@ -848,6 +869,35 @@ where self.register_transaction_counts(); } + /// Like `update_components_and_caches_on_removal` but uses + /// `on_committed_transaction`, which intentionally skips the `contract_users` + /// cleanup. Callers must invoke `remove_from_contract_users` for each + /// committed transaction once all preconfirmation rollback is complete. + fn update_components_and_caches_on_committed<'a>( + &mut self, + removed_transactions: impl Iterator, + ) { + for storage_entry in removed_transactions { + let tx = &storage_entry.transaction; + self.current_gas = self.current_gas.saturating_sub(tx.max_gas()); + self.current_bytes_size = self + .current_bytes_size + .saturating_sub(tx.metered_bytes_size()); + self.tx_id_to_storage_id.remove(&tx.id()); + self.collision_manager.on_committed_transaction(tx); + self.selection_algorithm + .on_removed_transaction(storage_entry); + } + self.register_transaction_counts(); + } + + /// Remove `transaction` from the `contract_users` index. Call this for + /// every committed transaction after preconfirmation rollback is complete. + pub fn remove_from_contract_users(&mut self, transaction: &PoolTransaction) { + self.collision_manager + .remove_from_contract_users(transaction); + } + #[cfg(test)] pub fn assert_integrity(&self, mut expected_txs: HashSet) { for tx in &self.tx_id_to_storage_id { diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index da296f91664..3ddc8d926bf 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -139,6 +139,7 @@ impl PoolWorkerInterface { pool: tx_pool, view_provider, tentative_preconfs: BTreeMap::new(), + tentative_preconf_pool_txs: BTreeMap::new(), current_canonical_height: initial_block_height, }; @@ -278,6 +279,11 @@ pub(super) struct PoolWorker { /// Used to roll back stale preconfirmations when the canonical block at /// that height does not include those transactions. tentative_preconfs: BTreeMap>, + /// Pool transactions that were removed from the pool when their + /// preconfirmation arrived and need deferred `contract_users` cleanup. + /// Keyed by the tentative block height so cleanup can be applied after + /// rollback reconciliation for each height in `process_block`. + tentative_preconf_pool_txs: BTreeMap>, /// The height of the last canonical block imported by this node. /// Used to discard late preconfirmations whose tentative block height /// is already at or below the canonical tip. @@ -508,7 +514,10 @@ where .map(|tx_status| tx_status.id) .collect(); - self.pool + // Collect pool transactions removed by block commitment so we can + // clean up their `contract_users` entries *after* rollback (see below). + let committed_pool_txs = self + .pool .process_committed_transactions(confirmed_tx_ids.iter().copied()); block_result.tx_status.iter().for_each(|tx_status| { @@ -546,6 +555,21 @@ where } } } + // Now that rollback for this height is complete, clean up the + // `contract_users` entries for preconfirmed txs that were committed + // (removed from pool) when their preconfirmation arrived. Deferring + // until here ensures rollback could still observe those entries. + if let Some(deferred_txs) = self.tentative_preconf_pool_txs.remove(&height) { + for tx in &deferred_txs { + self.pool.remove_from_contract_users(tx); + } + } + } + + // Clean up `contract_users` for pool txs committed directly by the + // canonical block (deferred from process_committed_transactions above). + for tx in &committed_pool_txs { + self.pool.remove_from_contract_users(tx); } let resolved_txs = self.pending_pool.new_known_txs( @@ -602,8 +626,14 @@ where let (outputs, block_height) = match &status { PreConfirmationStatus::Success(status) => { - self.pool.process_preconfirmed_committed_transaction(tx_id); + let pool_tx = self.pool.process_preconfirmed_committed_transaction(tx_id); let height = status.tx_pointer.block_height(); + if let Some(tx) = pool_tx { + self.tentative_preconf_pool_txs + .entry(height) + .or_default() + .push(tx); + } if let Some(outputs) = &status.resolved_outputs { (outputs, Some(height)) } else { @@ -616,8 +646,14 @@ where } } PreConfirmationStatus::Failure(status) => { - self.pool.process_preconfirmed_committed_transaction(tx_id); + let pool_tx = self.pool.process_preconfirmed_committed_transaction(tx_id); let height = status.tx_pointer.block_height(); + if let Some(tx) = pool_tx { + self.tentative_preconf_pool_txs + .entry(height) + .or_default() + .push(tx); + } if let Some(outputs) = &status.resolved_outputs { (outputs, Some(height)) } else {