diff --git a/.changes/fixed/3264.md b/.changes/fixed/3264.md new file mode 100644 index 00000000000..30edc35c336 --- /dev/null +++ b/.changes/fixed/3264.md @@ -0,0 +1 @@ +Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions. diff --git a/Cargo.lock b/Cargo.lock index b6a4635c148..01bda8a3c71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5075,6 +5075,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-syscall", "fuel-core-trace", + "fuel-core-txpool", "fuel-core-types 0.48.0", "futures", "lru 0.16.3", diff --git a/crates/services/txpool_v2/Cargo.toml b/crates/services/txpool_v2/Cargo.toml index 33634171add..b342b27df96 100644 --- a/crates/services/txpool_v2/Cargo.toml +++ b/crates/services/txpool_v2/Cargo.toml @@ -34,6 +34,7 @@ tracing = { workspace = true } [dev-dependencies] fuel-core-storage = { workspace = true, features = ["std", "test-helpers"] } fuel-core-trace = { path = "../../trace" } +fuel-core-txpool = { path = ".", features = ["test-helpers"] } mockall = { workspace = true } proptest = { workspace = true } rand = { workspace = true } diff --git a/crates/services/txpool_v2/src/collision_manager/basic.rs b/crates/services/txpool_v2/src/collision_manager/basic.rs index 686271980db..3c31d881e32 100644 --- a/crates/services/txpool_v2/src/collision_manager/basic.rs +++ b/crates/services/txpool_v2/src/collision_manager/basic.rs @@ -21,6 +21,7 @@ use fuel_core_types::{ CoinPredicate, CoinSigned, }, + contract::Contract as ContractInput, message::{ MessageCoinPredicate, MessageCoinSigned, @@ -57,6 +58,9 @@ pub struct BasicCollisionManager { coins_spenders: BTreeMap, /// Contract -> Transaction that currently create the contract contracts_creators: HashMap, + /// Contract -> Transactions (by TxId) that currently use the contract as an input. + /// Symmetric to `contracts_creators`; used to evict dependents during rollback. + contract_users: HashMap>, /// Blob -> Transaction that currently create the blob blobs_users: HashMap, } @@ -67,6 +71,7 @@ impl BasicCollisionManager { messages_spenders: HashMap::new(), coins_spenders: BTreeMap::new(), contracts_creators: HashMap::new(), + contract_users: HashMap::new(), blobs_users: HashMap::new(), } } @@ -76,6 +81,7 @@ impl BasicCollisionManager { self.messages_spenders.is_empty() && self.coins_spenders.is_empty() && self.contracts_creators.is_empty() + && self.contract_users.is_empty() && self.blobs_users.is_empty() } @@ -88,6 +94,7 @@ impl BasicCollisionManager { let mut message_spenders = HashMap::new(); let mut coins_spenders = BTreeMap::new(); let mut contracts_creators = HashMap::new(); + let mut contract_users: HashMap> = HashMap::new(); let mut blobs_users = HashMap::new(); for tx in expected_txs { if let PoolTransaction::Blob(checked_tx, _) = tx.deref() { @@ -110,7 +117,12 @@ impl BasicCollisionManager { }) => { message_spenders.insert(*nonce, tx.id()); } - Input::Contract { .. } => {} + Input::Contract(ContractInput { contract_id, .. }) => { + contract_users + .entry(*contract_id) + .or_default() + .push(tx.id()); + } } } for output in tx.outputs() { @@ -152,6 +164,26 @@ impl BasicCollisionManager { "Some contract creators are missing from the collision manager: {:?}", contracts_creators ); + for (contract_id, users) in &self.contract_users { + let expected = contract_users.remove(contract_id).unwrap_or_else(|| panic!( + "A contract ({}) user list is present on the collision manager that shouldn't be there.", + contract_id + )); + let mut actual_sorted = users.clone(); + actual_sorted.sort(); + let mut expected_sorted = expected; + expected_sorted.sort(); + assert_eq!( + actual_sorted, expected_sorted, + "contract_users mismatch for contract {}", + contract_id + ); + } + assert!( + contract_users.is_empty(), + "Some contract users are missing from the collision manager: {:?}", + contract_users + ); } } @@ -174,6 +206,17 @@ where .collect() } + fn get_contract_users(&self, contract_id: &ContractId) -> Vec { + self.contract_users + .get(contract_id) + .cloned() + .unwrap_or_default() + } + + fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool { + self.contracts_creators.contains_key(contract_id) + } + fn find_collisions( &self, transaction: &PoolTransaction, @@ -248,6 +291,7 @@ where let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.insert(*blob_id, storage_id); } + let tx_id = store_entry.transaction.id(); for input in store_entry.transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) @@ -262,7 +306,12 @@ where // insert message self.messages_spenders.insert(*nonce, storage_id); } - _ => {} + Input::Contract(ContractInput { contract_id, .. }) => { + self.contract_users + .entry(*contract_id) + .or_default() + .push(tx_id); + } } } for output in store_entry.transaction.outputs().iter() { @@ -284,6 +333,7 @@ where let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.remove(blob_id); } + let tx_id = transaction.id(); for input in transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) @@ -298,7 +348,14 @@ where // remove message self.messages_spenders.remove(nonce); } - _ => {} + Input::Contract(ContractInput { contract_id, .. }) => { + if let Some(users) = self.contract_users.get_mut(contract_id) { + users.retain(|id| id != &tx_id); + if users.is_empty() { + self.contract_users.remove(contract_id); + } + } + } } } for output in transaction.outputs().iter() { diff --git a/crates/services/txpool_v2/src/collision_manager/mod.rs b/crates/services/txpool_v2/src/collision_manager/mod.rs index c6e75a45167..d053b70cad4 100644 --- a/crates/services/txpool_v2/src/collision_manager/mod.rs +++ b/crates/services/txpool_v2/src/collision_manager/mod.rs @@ -8,6 +8,8 @@ use fuel_core_types::{ }; use std::collections::HashMap; +use fuel_core_types::fuel_tx::ContractId; + use crate::storage::StorageData; pub mod basic; @@ -27,6 +29,16 @@ pub trait CollisionManager { /// Get spenders of coins UTXO created by a transaction ID. fn get_coins_spenders(&self, tx_creator_id: &TxId) -> Vec; + /// Get the IDs of pool transactions that have `Input::Contract(contract_id)`. + /// Used during preconfirmation rollback to evict pool txs that were admitted + /// only because a preconfirmed tx temporarily created the contract. + fn get_contract_users(&self, contract_id: &ContractId) -> Vec; + + /// Returns true if a currently in-pool transaction creates `contract_id`. + /// Used during rollback to skip eviction when the contract is still available + /// via a pool tx (independent of the rolled-back preconfirmation). + fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool; + /// Inform the collision manager that a transaction was stored. fn on_stored_transaction( &mut self, diff --git a/crates/services/txpool_v2/src/extracted_outputs.rs b/crates/services/txpool_v2/src/extracted_outputs.rs index ce0c7f3f28b..e1cd66974ec 100644 --- a/crates/services/txpool_v2/src/extracted_outputs.rs +++ b/crates/services/txpool_v2/src/extracted_outputs.rs @@ -49,7 +49,13 @@ impl ExtractedOutputs { for (utxo_id, output) in outputs { match output { Output::ContractCreated { contract_id, .. } => { - self.contract_created.insert(*contract_id, *utxo_id.tx_id()); + let tx_id = *utxo_id.tx_id(); + self.contract_created.insert(*contract_id, tx_id); + // Track the reverse mapping so cleanup via new_executed_transaction works. + self.contract_created_by_tx + .entry(tx_id) + .or_default() + .push(*contract_id); } Output::Coin { to, @@ -131,6 +137,16 @@ impl ExtractedOutputs { self.new_executed_transaction(tx_id); } + /// Returns the contract IDs created by `tx_id`, if any. + /// Call this **before** [`new_skipped_transaction`] / [`new_executed_transaction`] + /// if the caller needs the list for cleanup. + pub fn contracts_created_by(&self, tx_id: &TxId) -> &[ContractId] { + self.contract_created_by_tx + .get(tx_id) + .map(Vec::as_slice) + .unwrap_or(&[]) + } + pub fn new_executed_transaction(&mut self, tx_id: &TxId) { let contract_ids = self.contract_created_by_tx.remove(tx_id); if let Some(contract_ids) = contract_ids { diff --git a/crates/services/txpool_v2/src/lib.rs b/crates/services/txpool_v2/src/lib.rs index c27845af7b7..234dbbdf946 100644 --- a/crates/services/txpool_v2/src/lib.rs +++ b/crates/services/txpool_v2/src/lib.rs @@ -61,6 +61,9 @@ mod spent_inputs; mod tests; #[cfg(test)] fuel_core_trace::enable_tracing!(); +// Needed to activate the `test-helpers` feature flag for integration tests. +#[cfg(test)] +use fuel_core_txpool as _; use fuel_core_types::fuel_asm::Word; pub use pool::TxPoolStats; diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index cbed6067ba9..5b45be4008f 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -426,6 +426,55 @@ where self.tx_id_to_storage_id.keys() } + /// Process a preconfirmed transaction as committed while recording its spent + /// inputs so they can be rolled back later if the transaction is not + /// included in the canonical block. + pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) { + // If the tx was already extracted for local block production it is no + // longer in `tx_id_to_storage_id`, so the branch below that calls + // `record_tentative_spend` will be skipped. Preserve the input keys + // now, before `spend_inputs_by_tx_id` drains `spender_of_inputs[T]`. + if !self.tx_id_to_storage_id.contains_key(&tx_id) { + self.spent_inputs.move_spender_to_tentative(tx_id); + } + self.spent_inputs.spend_inputs_by_tx_id(tx_id); + if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { + let dependents: Vec = + self.storage.get_direct_dependents(storage_id).collect(); + let Some(transaction) = self.storage.remove_transaction(storage_id) else { + // Invariant violation. Panic in tests, log in production. + debug_assert!(false, "Storage data not found for the transaction"); + tracing::warn!("Storage data not found for the transaction."); + return; + }; + self.extracted_outputs + .new_extracted_transaction(&transaction.transaction); + // Save the inputs before spending them permanently, so we can roll + // them back if the canonical block omits this transaction. + self.spent_inputs + .record_tentative_spend(tx_id, transaction.transaction.inputs()); + self.spent_inputs + .spend_inputs(tx_id, transaction.transaction.inputs()); + self.update_components_and_caches_on_removal(iter::once(&transaction)); + + let mut new_executable_transaction = false; + for dependent in dependents { + if !self.storage.has_dependencies(&dependent) + && let Some(storage_data) = self.storage.get(&dependent) + { + self.selection_algorithm + .new_executable_transaction(dependent, storage_data); + new_executable_transaction = true; + } + } + if new_executable_transaction { + self.new_executable_txs_notifier.send_replace(()); + } + } + + self.update_stats(); + } + /// Process committed transactions: /// - Remove transaction but keep its dependents and the dependents become executables. /// - Notify about possible new executable transactions. @@ -438,10 +487,9 @@ where self.storage.get_direct_dependents(storage_id).collect(); let Some(transaction) = self.storage.remove_transaction(storage_id) else { + // Invariant violation. Panic in tests, log in production. debug_assert!(false, "Storage data not found for the transaction"); - tracing::warn!( - "Storage data not found for the transaction during `remove_transaction`." - ); + tracing::warn!("Storage data not found for the transaction during."); continue; }; self.extracted_outputs @@ -461,14 +509,12 @@ where let mut new_executable_transaction = false; for promote in transactions_to_promote { let Some(storage_data) = self.storage.get(&promote) else { + // Invariant violation. Panic in tests, log in production. debug_assert!( false, "Dependent storage data not found for the transaction" ); - tracing::warn!( - "Dependent storage data not found for \ - the transaction during `remove_transaction`." - ); + tracing::warn!("Dependent storage data not found for the transaction."); continue; }; @@ -568,13 +614,13 @@ where debug_assert!(!self.storage.has_dependencies(storage_id)); let Some(storage_data) = self.storage.get(storage_id) else { + // Invariant violation. Panic in tests, log in production. debug_assert!( false, "Storage data not found for one of the less worth transactions" ); tracing::warn!( - "Storage data not found for one of the less \ - worth transactions during `find_free_space`." + "Storage data not found for one of the less worth transactions" ); continue; }; @@ -685,6 +731,87 @@ where self.update_stats(); } + /// Rollback a preconfirmed transaction that was not included in the canonical block. + /// + /// This clears the preconfirmation-derived outputs and removes any pool transactions + /// that depend on those outputs, since those inputs no longer exist on-chain. + pub fn rollback_preconfirmed_transaction(&mut self, tx_id: TxId) { + // Capture contracts created by this preconfirmation BEFORE clearing + // extracted_outputs, so we can evict any pool txs that were admitted + // only because of the now-stale temporary contract existence. + let created_contracts: Vec<_> = + self.extracted_outputs.contracts_created_by(&tx_id).to_vec(); + + // Remove preconfirmed outputs so dependents can't use them. + self.extracted_outputs.new_skipped_transaction(&tx_id); + // Allow the transaction itself to be re-submitted. + self.spent_inputs.unspend_preconfirmed(tx_id); + + let reason = format!( + "Preconfirmed parent transaction {tx_id} was not included in the canonical block" + ); + + // Remove any pool transactions that used the preconfirmed coin outputs as inputs. + let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id); + if !coin_dependents.is_empty() { + for dependent in coin_dependents { + let removed = self + .storage + .remove_transaction_and_dependents_subtree(dependent); + self.update_components_and_caches_on_removal(removed.iter()); + let removed_txs: Vec<_> = removed + .into_iter() + .map(|data| { + let dependent_tx_id = data.transaction.id(); + let tx_status = + statuses::SqueezedOut::new(reason.clone(), dependent_tx_id); + (dependent_tx_id, tx_status) + }) + .collect(); + if !removed_txs.is_empty() { + self.tx_status_manager.squeezed_out_txs(removed_txs); + } + } + } + + // Remove any pool transactions that used a preconfirmed contract creation + // as an input and were admitted only via `extracted_outputs` (no in-pool + // graph dependency). Skip eviction when the contract is still available + // through an independent in-pool creator. + for contract_id in created_contracts { + if self + .collision_manager + .contract_created_in_pool(&contract_id) + { + continue; + } + let user_tx_ids = self.collision_manager.get_contract_users(&contract_id); + for user_tx_id in user_tx_ids { + let Some(&storage_id) = self.tx_id_to_storage_id.get(&user_tx_id) else { + continue; + }; + let removed = self + .storage + .remove_transaction_and_dependents_subtree(storage_id); + self.update_components_and_caches_on_removal(removed.iter()); + let removed_txs: Vec<_> = removed + .into_iter() + .map(|data| { + let dependent_tx_id = data.transaction.id(); + let tx_status = + statuses::SqueezedOut::new(reason.clone(), dependent_tx_id); + (dependent_tx_id, tx_status) + }) + .collect(); + if !removed_txs.is_empty() { + self.tx_status_manager.squeezed_out_txs(removed_txs); + } + } + } + + self.update_stats(); + } + fn check_blob_does_not_exist( tx: &PoolTransaction, persistent_storage: &impl TxPoolPersistentStorage, diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index a50de4c507b..da296f91664 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -14,7 +14,10 @@ use fuel_core_types::{ }, }; use std::{ - iter, + collections::{ + BTreeMap, + HashSet, + }, ops::Deref, sync::Arc, time::SystemTime, @@ -91,6 +94,7 @@ impl PoolWorkerInterface { tx_pool: TxPool, view_provider: Arc>, limits: &ServiceChannelLimits, + initial_block_height: BlockHeight, ) -> Self where View: TxPoolPersistentStorage, @@ -134,6 +138,8 @@ impl PoolWorkerInterface { pending_pool: PendingPool::new(tx_pool.config.pending_pool_tx_ttl), pool: tx_pool, view_provider, + tentative_preconfs: BTreeMap::new(), + current_canonical_height: initial_block_height, }; tokio_runtime.block_on(async { @@ -268,6 +274,14 @@ pub(super) struct PoolWorker { pending_pool: PendingPool, view_provider: Arc>, notification_sender: Sender, + /// Tracks preconfirmed transaction IDs by their tentative block height. + /// Used to roll back stale preconfirmations when the canonical block at + /// that height does not include those transactions. + tentative_preconfs: BTreeMap>, + /// The height of the last canonical block imported by this node. + /// Used to discard late preconfirmations whose tentative block height + /// is already at or below the canonical tip. + current_canonical_height: BlockHeight, } impl PoolWorker @@ -485,9 +499,17 @@ where } fn process_block(&mut self, block_result: SharedImportResult) { - self.pool.process_committed_transactions( - block_result.tx_status.iter().map(|tx_status| tx_status.id), - ); + let block_height = *block_result.sealed_block.entity.header().height(); + self.current_canonical_height = self.current_canonical_height.max(block_height); + + let confirmed_tx_ids: HashSet = block_result + .tx_status + .iter() + .map(|tx_status| tx_status.id) + .collect(); + + self.pool + .process_committed_transactions(confirmed_tx_ids.iter().copied()); block_result.tx_status.iter().for_each(|tx_status| { self.pool @@ -495,6 +517,37 @@ where .new_executed_transaction(&tx_status.id); }); + // Reconcile tentative preconfirmations for all heights up to and including + // the imported block height. Any preconfirmed tx that is absent from the + // canonical block must have its state rolled back so those inputs/outputs + // do not remain stale in the mempool. + let stale_heights: Vec = self + .tentative_preconfs + .range(..=block_height) + .map(|(h, _)| *h) + .collect(); + + for height in stale_heights { + if let Some(tentative_txs) = self.tentative_preconfs.remove(&height) { + for tx_id in tentative_txs { + if confirmed_tx_ids.contains(&tx_id) { + // Tx was included in the canonical block — confirm the + // tentative spend record so it won't be rolled back. + self.pool.spent_inputs.confirm_tentative_spend(&tx_id); + } else { + tracing::debug!( + "Rolling back stale preconfirmation for tx {} \ + (tentative block {}, canonical block {})", + tx_id, + height, + block_height, + ); + self.pool.rollback_preconfirmed_transaction(tx_id); + } + } + } + } + let resolved_txs = self.pending_pool.new_known_txs( block_result .sealed_block @@ -524,20 +577,54 @@ where tx_id: TxId, status: PreConfirmationStatus, ) { - let outputs = match &status { + // Reject late preconfirmations whose tentative block height is already + // at or below the node's current canonical tip. Applying them would + // temporarily mark inputs as spent and admit dependents against outputs + // that may not be in any canonical block, creating a stale-acceptance + // window that is only unwound on the next block import. + let preconf_height = match &status { + PreConfirmationStatus::Success(s) => Some(s.tx_pointer.block_height()), + PreConfirmationStatus::Failure(s) => Some(s.tx_pointer.block_height()), + PreConfirmationStatus::SqueezedOut(_) => None, + }; + if let Some(h) = preconf_height + && h <= self.current_canonical_height + { + tracing::debug!( + "Ignoring late preconfirmation for tx {} at height {} \ + (current canonical height {})", + tx_id, + h, + self.current_canonical_height, + ); + return; + } + + let (outputs, block_height) = match &status { PreConfirmationStatus::Success(status) => { - self.pool.process_committed_transactions(iter::once(tx_id)); + self.pool.process_preconfirmed_committed_transaction(tx_id); + let height = status.tx_pointer.block_height(); if let Some(outputs) = &status.resolved_outputs { - outputs + (outputs, Some(height)) } else { + // Still track the height so block import can clean up spent_inputs. + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); return; } } PreConfirmationStatus::Failure(status) => { - self.pool.process_committed_transactions(iter::once(tx_id)); + self.pool.process_preconfirmed_committed_transaction(tx_id); + let height = status.tx_pointer.block_height(); if let Some(outputs) = &status.resolved_outputs { - outputs + (outputs, Some(height)) } else { + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); return; } } @@ -546,6 +633,16 @@ where return; } }; + + // Track the tentative block height so the preconfirmed outputs can be + // rolled back if the canonical block at that height omits this tx. + if let Some(height) = block_height { + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); + } + // All of this can be useful in case that we didn't know about the transaction let resolved = self .pending_pool diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index 4a1cc2de4ef..68709739a1e 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -806,8 +806,12 @@ where let (current_height_writer, current_height_reader) = unsafe { SeqLock::new(current_height) }; - let pool_worker = - PoolWorkerInterface::new(txpool, storage_provider, &service_channel_limits); + let pool_worker = PoolWorkerInterface::new( + txpool, + storage_provider, + &service_channel_limits, + current_height, + ); let shared_state = SharedState { request_read_sender: pool_worker.request_read_sender.clone(), diff --git a/crates/services/txpool_v2/src/spent_inputs.rs b/crates/services/txpool_v2/src/spent_inputs.rs index 4cc0c933c3d..66c556a7e83 100644 --- a/crates/services/txpool_v2/src/spent_inputs.rs +++ b/crates/services/txpool_v2/src/spent_inputs.rs @@ -27,6 +27,9 @@ pub struct SpentInputs { /// transaction spent it. Later, this information can be used to unspent /// or fully spend the input. spender_of_inputs: HashMap>, + /// Inputs permanently spent during preconfirmation processing, saved so + /// they can be rolled back if the preconfirmation turns out to be stale. + tentative_spent: HashMap>, } impl SpentInputs { @@ -34,6 +37,7 @@ impl SpentInputs { Self { spent_inputs: LruCache::new(capacity), spender_of_inputs: HashMap::new(), + tentative_spent: HashMap::new(), } } @@ -90,6 +94,21 @@ impl SpentInputs { } } + /// Transitions the inputs recorded by [`maybe_spend_inputs`] from the + /// `spender_of_inputs` map directly into `tentative_spent`, without + /// touching the live `spent_inputs` LRU cache. + /// + /// Call this **before** [`spend_inputs_by_tx_id`] when a preconfirmation + /// arrives for a tx that was already extracted for local block production + /// (i.e. absent from pool storage). This preserves the input keys for a + /// potential rollback via [`unspend_preconfirmed`] before + /// [`spend_inputs_by_tx_id`] drains `spender_of_inputs`. + pub fn move_spender_to_tentative(&mut self, tx_id: TxId) { + if let Some(keys) = self.spender_of_inputs.get(&tx_id) { + self.tentative_spent.insert(tx_id, keys.clone()); + } + } + /// If transaction is skipped during the block production, this functions /// can be used to unspend inputs, allowing other transactions to spend them. pub fn unspend_inputs(&mut self, tx_id: TxId) { @@ -114,6 +133,43 @@ impl SpentInputs { pub fn is_spent_tx(&self, tx: &TxId) -> bool { self.spent_inputs.contains(&InputKey::Tx(*tx)) } + + /// Record inputs that were permanently spent during preconfirmation processing. + /// The saved keys can later be rolled back via [`unspend_preconfirmed`]. + pub fn record_tentative_spend(&mut self, tx_id: TxId, inputs: &[Input]) { + let keys: Vec = inputs + .iter() + .filter_map(|input| { + if input.is_coin() { + input.utxo_id().cloned().map(InputKey::Utxo) + } else if input.is_message() { + input.nonce().cloned().map(InputKey::Message) + } else { + None + } + }) + .collect(); + self.tentative_spent.insert(tx_id, keys); + } + + /// Remove the tentative-spend record for a confirmed transaction, preventing + /// a spurious rollback. Called when the preconfirmed tx is included in the + /// canonical block. + pub fn confirm_tentative_spend(&mut self, tx_id: &TxId) { + self.tentative_spent.remove(tx_id); + } + + /// Removes the tx entry and any individually-tracked UTXO/message inputs + /// from spent inputs, allowing the same inputs to be re-used. + /// Used when rolling back a stale preconfirmation. + pub fn unspend_preconfirmed(&mut self, tx_id: TxId) { + self.spent_inputs.pop(&InputKey::Tx(tx_id)); + if let Some(saved_keys) = self.tentative_spent.remove(&tx_id) { + for key in saved_keys { + self.spent_inputs.pop(&key); + } + } + } } #[cfg(test)] diff --git a/crates/services/txpool_v2/src/tests/mocks.rs b/crates/services/txpool_v2/src/tests/mocks.rs index 6348b94a6b8..e9177e8b771 100644 --- a/crates/services/txpool_v2/src/tests/mocks.rs +++ b/crates/services/txpool_v2/src/tests/mocks.rs @@ -114,6 +114,11 @@ impl MockTxStatusManager { tx, } } + + /// Send a preconfirmation update to the pool worker. + pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) { + let _ = self.tx_preconfirmations_update_sender.send((tx_id, status)); + } } impl ports::TxStatusManager for MockTxStatusManager { diff --git a/crates/services/txpool_v2/src/tests/mod.rs b/crates/services/txpool_v2/src/tests/mod.rs index 4578a2a50bc..cdcb9f78ecd 100644 --- a/crates/services/txpool_v2/src/tests/mod.rs +++ b/crates/services/txpool_v2/src/tests/mod.rs @@ -5,6 +5,7 @@ mod stability_test; mod tests_p2p; mod tests_pending_pool; mod tests_pool; +mod tests_preconf_rollback; mod tests_service; mod tx_status_manager_integration; mod universe; diff --git a/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs new file mode 100644 index 00000000000..fa46ce799d9 --- /dev/null +++ b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs @@ -0,0 +1,565 @@ +//! When a block producer emits preconfirmations and then crashes or produces a +//! different block at the same height, the sentry/RPC mempool must purge the +//! stale preconfirmation state on the next canonical block import. + +use std::sync::Arc; + +use fuel_core_types::{ + blockchain::{ + block::Block, + consensus::Sealed, + }, + fuel_tx::{ + Contract, + Output, + TxPointer, + UniqueIdentifier, + UtxoId, + }, + fuel_types::{ + BlockHeight, + ContractId, + }, + services::{ + block_importer::ImportResult, + executor::{ + TransactionExecutionResult, + TransactionExecutionStatus, + }, + transaction_status::{ + PreConfirmationStatus, + statuses, + }, + }, +}; + +use fuel_core_services::Service as ServiceTrait; + +use crate::{ + Constraints, + tests::{ + mocks::MockImporter, + universe::{ + TestPoolUniverse, + create_contract_input, + }, + }, +}; + +/// Build a canonical block sealed at `height` that contains `tx_ids`. +fn make_block_import( + height: u32, + tx_ids: &[fuel_core_types::fuel_tx::TxId], +) -> Arc< + dyn std::ops::Deref + + Send + + Sync, +> { + let sealed_block = Sealed { + entity: { + let mut block = Block::default(); + block + .header_mut() + .set_block_height(BlockHeight::new(height)); + block + }, + consensus: Default::default(), + }; + let tx_statuses = tx_ids + .iter() + .map(|id| TransactionExecutionStatus { + id: *id, + result: TransactionExecutionResult::Success { + result: None, + receipts: Arc::new(vec![]), + total_gas: 0, + total_fee: 0, + }, + }) + .collect(); + Arc::new(ImportResult::new_from_local(sealed_block, tx_statuses, vec![]).wrap()) +} + +/// Build a `PreConfirmationStatus::Success` that carries one coin output. +fn make_preconf_success( + tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, + output: Output, +) -> PreConfirmationStatus { + let utxo_id = UtxoId::new(tx_id, 0); + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: Some(vec![(utxo_id, output)]), + } + .into(), + ) +} + +/// Build a `PreConfirmationStatus::Success` whose resolved outputs include a +/// `ContractCreated` entry, so that `extracted_outputs.contract_exists(contract_id)` +/// becomes true after the preconfirmation is processed. +fn make_preconf_with_contract_created( + tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, + contract_id: ContractId, +) -> PreConfirmationStatus { + let utxo_id = UtxoId::new(tx_id, 0); + let output = Output::ContractCreated { + contract_id, + state_root: Contract::default_state_root(), + }; + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: Some(vec![(utxo_id, output)]), + } + .into(), + ) +} + +/// Build a `PreConfirmationStatus::Success` with no resolved outputs. +fn make_preconf_success_no_outputs( + _tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, +) -> PreConfirmationStatus { + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: None, + } + .into(), + ) +} + +/// After a preconfirmation arrives for tx T at height H, and then a canonical +/// block at height H is imported *without* T, the tx should no longer be +/// marked as "spent" i.e. it can be re-inserted into the pool. +#[tokio::test] +async fn preconfirmed_tx_can_be_reinserted_after_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Insert and wait for submitted status. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Simulate the block producer preconfirming tx at block height 1. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + // Give the pool worker time to process the preconfirmation. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The tx should not be in the pool any more (committed). + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx should have been committed out of pool" + ); + + // When — import an empty block at height 1 (no tx T). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Give the worker time to process the block. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — re-inserting the same tx should now succeed because + // spent_inputs was rolled back. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + service.stop_and_await().await.unwrap(); +} + +/// When a preconfirmed tx's outputs are used by a dependent tx D, and the +/// canonical block does not include the preconfirmed tx, D must be removed +/// from the pool. +#[tokio::test] +async fn dependents_of_preconfirmed_tx_removed_on_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + // tx_parent is the tx that will be preconfirmed but not included. + // It produces output_a (a coin). + let (output_a, unset_input_a) = universe.create_output_and_input(); + let tx_parent = universe.build_script_transaction(None, Some(vec![output_a]), 1); + let tx_parent_id = tx_parent.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Simulate receiving a preconfirmation for tx_parent (which the sentry may + // never have seen). The preconf carries output_a. + universe.send_preconfirmation( + tx_parent_id, + make_preconf_success(tx_parent_id, 1, output_a), + ); + + // Give the worker time to process the preconfirmation and register outputs. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Now insert tx_child that spends output_a (utxo from tx_parent). + let input_a = unset_input_a.into_input(UtxoId::new(tx_parent_id, 0)); + let tx_child = universe.build_script_transaction(Some(vec![input_a]), None, 2); + let tx_child_id = tx_child.id(&Default::default()); + + service.shared.insert(tx_child.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_child_id]) + .await; + + // Sanity: child is in the pool. + let found = service.shared.find(vec![tx_child_id]).await.unwrap(); + assert!(found[0].is_some(), "tx_child should be in pool"); + + // When — import a block at height 1 that does NOT contain tx_parent. + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Then — tx_child depends on a now-stale preconfirmed output; it must be + // squeezed out. + universe + .await_expected_tx_statuses(vec![tx_child_id], |_, status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + let found = service.shared.find(vec![tx_child_id]).await.unwrap(); + assert!(found[0].is_none(), "tx_child should have been removed"); + + service.stop_and_await().await.unwrap(); +} + +/// When a preconfirmed tx IS included in the canonical block, its state must +/// be committed normally — no spurious rollback. +#[tokio::test] +async fn preconfirmed_tx_committed_normally_when_in_canonical_block() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Preconfirmation at height 1. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // When — import a block at height 1 that CONTAINS the tx. + block_sender + .send(make_block_import(1, &[tx_id])) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — tx must not reappear in the pool; re-inserting it should fail + // because its inputs are now permanently spent (committed in the block). + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx should not be in pool after block commit" + ); + + service.stop_and_await().await.unwrap(); +} + +/// Regression test for the "extracted-first" rollback bug. +/// +/// Sequence: +/// 1. Tx T is inserted and then extracted for local block production. +/// `maybe_spend_inputs` records T's coin inputs in `spender_of_inputs`. +/// 2. A preconfirmation for T arrives. Because T was already removed from +/// `tx_id_to_storage_id`, `process_preconfirmed_committed_transaction` +/// takes the `else` branch and must still save T's inputs into +/// `tentative_spent` so they can be rolled back later. +/// 3. The canonical block omits T → `rollback_preconfirmed_transaction` must +/// clear those coin-input keys from `spent_inputs`. +/// 4. Re-inserting T must succeed (inputs no longer marked spent). +#[tokio::test] +async fn extracted_tx_inputs_freed_after_preconf_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Insert and wait for the tx to be accepted. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Extract the tx (simulating local block production). + // This calls `maybe_spend_inputs` and removes the tx from storage. + let extracted = service + .shared + .extract_transactions_for_block(Constraints { + minimal_gas_price: 0, + max_gas: u64::MAX, + maximum_txs: u16::MAX, + maximum_block_size: u32::MAX, + excluded_contracts: Default::default(), + }) + .unwrap(); + assert_eq!(extracted.len(), 1, "expected exactly one extracted tx"); + + // Preconfirmation arrives for the already-extracted tx. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // When — import an empty canonical block at height 1 (tx is absent). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — coin inputs must have been freed; re-inserting T must succeed. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + service.stop_and_await().await.unwrap(); +} + +/// Stale preconfirmations at an older height are cleaned up when a later +/// block is imported, even if the heights don't match exactly. +#[tokio::test] +async fn stale_preconfs_at_older_height_cleaned_up_by_later_block() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + let (output_a, unset_input_a) = universe.create_output_and_input(); + let tx_parent = universe.build_script_transaction(None, Some(vec![output_a]), 1); + let tx_parent_id = tx_parent.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Preconfirmation at height 1 (but the block producer crashes). + universe.send_preconfirmation( + tx_parent_id, + make_preconf_success(tx_parent_id, 1, output_a), + ); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert a dependent tx while the preconf outputs are "live". + let input_a = unset_input_a.into_input(UtxoId::new(tx_parent_id, 0)); + let tx_child = universe.build_script_transaction(Some(vec![input_a]), None, 2); + let tx_child_id = tx_child.id(&Default::default()); + + service.shared.insert(tx_child).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_child_id]) + .await; + + // When — a block at height 2 arrives (skipping height 1). The preconf for + // height 1 was never resolved, so it must be rolled back. + block_sender.send(make_block_import(2, &[])).await.unwrap(); + + // Then — tx_child (which depended on the stale preconf output) is removed. + universe + .await_expected_tx_statuses(vec![tx_child_id], |_, status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + service.stop_and_await().await.unwrap(); +} + +/// A late preconfirmation arriving after the referenced canonical block has +/// already been imported must be silently discarded. +/// +/// Sequence: +/// 1. Node imports canonical block at height H (tx T is absent from it). +/// 2. T is inserted into the pool. +/// 3. A delayed preconfirmation for T arrives claiming height H. +/// 4. Because H <= current_canonical_height the preconfirmation is ignored: +/// T must remain in the pool (not committed out) and spent_inputs must +/// not be mutated. +#[tokio::test] +async fn late_preconf_below_canonical_height_is_ignored() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Import a canonical block at height 1 that does NOT include the tx. + // After this the node's canonical height is 1. + block_sender.send(make_block_import(1, &[])).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert T after the block is imported. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // When — a delayed preconfirmation for T arrives at height 1 + // (already at or below the canonical tip). + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — the preconfirmation must be ignored. + // If it were applied, `process_preconfirmed_committed_transaction` would + // remove T from the pool. T must still be present. + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_some(), + "tx should still be in pool — late preconfirmation must have been ignored" + ); + + service.stop_and_await().await.unwrap(); +} + +/// Regression test: a pool tx admitted via a preconfirmed contract creation must +/// be evicted when the canonical block omits the contract-creating preconfirmation. +/// +/// Sequence: +/// 1. Preconfirmation for tx P arrives carrying `Output::ContractCreated { C }`. +/// `extracted_outputs` now reports `contract_exists(C) == true`. +/// 2. Tx D with `Input::Contract(C)` is inserted into the pool. +/// It passes validation because `extracted_outputs.contract_exists(C)`. +/// D has no graph dependency on any in-pool creator of C. +/// 3. An empty canonical block is imported at height 1 (P is absent). +/// Rollback clears C from `extracted_outputs`. +/// 4. D must be squeezed out — it was only valid because of the now-stale +/// preconfirmed contract creation. +#[tokio::test] +async fn contract_dependent_tx_removed_on_preconf_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + // Compute the contract id that P will advertise as created. + // P is never inserted into the pool — it arrives only via preconfirmation. + let contract_code = vec![1u8, 2, 3]; + let contract: fuel_core_types::fuel_tx::Contract = contract_code.into(); + let contract_id = fuel_core_types::fuel_tx::Contract::id( + &Default::default(), + &contract.root(), + &Default::default(), + ); + let p_tx_id = fuel_core_types::fuel_tx::TxId::from([0xABu8; 32]); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Preconfirmation for P arrives (with ContractCreated output). + universe.send_preconfirmation( + p_tx_id, + make_preconf_with_contract_created(p_tx_id, 1, contract_id), + ); + + // Give the worker time to register the preconfirmed contract in extracted_outputs. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert D which uses the preconfirmed contract as an input. + // The mock DB has no record of this contract, so admission relies solely on + // extracted_outputs.contract_exists(contract_id). + // A script tx with Input::Contract at index 0 also needs Output::Contract(0). + let contract_input = create_contract_input(p_tx_id, 0, contract_id); + let contract_output = Output::contract(0, Default::default(), Default::default()); + let tx_d = universe.build_script_transaction( + Some(vec![contract_input]), + Some(vec![contract_output]), + 5, + ); + let tx_d_id = tx_d.id(&Default::default()); + + service.shared.insert(tx_d).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_d_id]) + .await; + + let found = service.shared.find(vec![tx_d_id]).await.unwrap(); + assert!(found[0].is_some(), "tx_d should be in the pool"); + + // When — import an empty canonical block at height 1 (P was never included). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Then — D must be squeezed out because the contract it relied on never landed. + universe + .await_expected_tx_statuses(vec![tx_d_id], |_, status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + let found = service.shared.find(vec![tx_d_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx_d should have been removed from the pool" + ); + + service.stop_and_await().await.unwrap(); +} diff --git a/crates/services/txpool_v2/src/tests/universe.rs b/crates/services/txpool_v2/src/tests/universe.rs index 17212616089..7028c8e4aeb 100644 --- a/crates/services/txpool_v2/src/tests/universe.rs +++ b/crates/services/txpool_v2/src/tests/universe.rs @@ -6,6 +6,8 @@ use std::{ sync::Arc, }; +use fuel_core_types::services::transaction_status::PreConfirmationStatus; + use crate::{ GasPrice, Service, @@ -488,6 +490,12 @@ impl TestPoolUniverse { self.pool.clone().unwrap() } + /// Send a preconfirmation update directly to the pool worker. + pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) { + self.mock_tx_status_manager + .send_preconfirmation(tx_id, status); + } + pub fn setup_coin(&mut self) -> (Coin, Input) { let input = self.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, None);