From 331245fedec5933e9c53629a7347cf876922292d Mon Sep 17 00:00:00 2001 From: Hannes Karppila <2204863+Dentosal@users.noreply.github.com> Date: Thu, 16 Apr 2026 23:18:37 +0300 Subject: [PATCH 1/7] Rollback unsuccessful preconfs in the mempool (#3264) Closes #3098. When a block producer sends preconfirmation updates, sentry nodes optimistically treat the included transactions as committed, removing them from the mempool and marking their inputs as spent. If the producer crashes and re-produces a block at the same height without those transactions, the mempool is left in a stale state: inputs stay marked as spent and outputs linger in `extracted_outputs`, preventing re-submission of rolled-back transactions and causing dependents to reference non-existent UTXOs. This PR makes preconfirmed transactions tentative until the canonical block at their height is imported. On import, preconfirmed txs present in the block are confirmed and their tracking is cleared; those absent are rolled back by restoring inputs, purging dependents, and emitting `SqueezedOut` notifications. It also adds integration tests: re-insertion after rollback, dependent eviction, normal confirmation, and stale-height cleanup. - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) - [ ] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --- .changes/fixed/3264.md | 1 + Cargo.lock | 1 + crates/services/txpool_v2/Cargo.toml | 1 + .../txpool_v2/src/collision_manager/basic.rs | 63 +- .../txpool_v2/src/collision_manager/mod.rs | 12 + .../txpool_v2/src/extracted_outputs.rs | 18 +- crates/services/txpool_v2/src/lib.rs | 3 + crates/services/txpool_v2/src/pool.rs | 136 ++++- crates/services/txpool_v2/src/pool_worker.rs | 115 +++- crates/services/txpool_v2/src/service.rs | 8 +- crates/services/txpool_v2/src/spent_inputs.rs | 56 ++ crates/services/txpool_v2/src/tests/mocks.rs | 5 + crates/services/txpool_v2/src/tests/mod.rs | 1 + .../src/tests/tests_preconf_rollback.rs | 565 ++++++++++++++++++ .../services/txpool_v2/src/tests/universe.rs | 8 + 15 files changed, 976 insertions(+), 17 deletions(-) create mode 100644 .changes/fixed/3264.md create mode 100644 crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs diff --git a/.changes/fixed/3264.md b/.changes/fixed/3264.md new file mode 100644 index 00000000000..30edc35c336 --- /dev/null +++ b/.changes/fixed/3264.md @@ -0,0 +1 @@ +Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions. diff --git a/Cargo.lock b/Cargo.lock index b09f9804d96..d13789b3ef2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4297,6 +4297,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-syscall", "fuel-core-trace", + "fuel-core-txpool", "fuel-core-types 0.47.3", "futures", "lru 0.13.0", diff --git a/crates/services/txpool_v2/Cargo.toml b/crates/services/txpool_v2/Cargo.toml index 24a5f567bb1..2d59c12081c 100644 --- a/crates/services/txpool_v2/Cargo.toml +++ b/crates/services/txpool_v2/Cargo.toml @@ -34,6 +34,7 @@ tracing = { workspace = true } [dev-dependencies] fuel-core-storage = { workspace = true, features = ["std", "test-helpers"] } fuel-core-trace = { path = "../../trace" } +fuel-core-txpool = { path = ".", features = ["test-helpers"] } mockall = { workspace = true } rand = { workspace = true } tokio = { workspace = true, features = ["sync", "test-util"] } diff --git a/crates/services/txpool_v2/src/collision_manager/basic.rs b/crates/services/txpool_v2/src/collision_manager/basic.rs index 686271980db..3c31d881e32 100644 --- a/crates/services/txpool_v2/src/collision_manager/basic.rs +++ b/crates/services/txpool_v2/src/collision_manager/basic.rs @@ -21,6 +21,7 @@ use fuel_core_types::{ CoinPredicate, CoinSigned, }, + contract::Contract as ContractInput, message::{ MessageCoinPredicate, MessageCoinSigned, @@ -57,6 +58,9 @@ pub struct BasicCollisionManager { coins_spenders: BTreeMap, /// Contract -> Transaction that currently create the contract contracts_creators: HashMap, + /// Contract -> Transactions (by TxId) that currently use the contract as an input. + /// Symmetric to `contracts_creators`; used to evict dependents during rollback. + contract_users: HashMap>, /// Blob -> Transaction that currently create the blob blobs_users: HashMap, } @@ -67,6 +71,7 @@ impl BasicCollisionManager { messages_spenders: HashMap::new(), coins_spenders: BTreeMap::new(), contracts_creators: HashMap::new(), + contract_users: HashMap::new(), blobs_users: HashMap::new(), } } @@ -76,6 +81,7 @@ impl BasicCollisionManager { self.messages_spenders.is_empty() && self.coins_spenders.is_empty() && self.contracts_creators.is_empty() + && self.contract_users.is_empty() && self.blobs_users.is_empty() } @@ -88,6 +94,7 @@ impl BasicCollisionManager { let mut message_spenders = HashMap::new(); let mut coins_spenders = BTreeMap::new(); let mut contracts_creators = HashMap::new(); + let mut contract_users: HashMap> = HashMap::new(); let mut blobs_users = HashMap::new(); for tx in expected_txs { if let PoolTransaction::Blob(checked_tx, _) = tx.deref() { @@ -110,7 +117,12 @@ impl BasicCollisionManager { }) => { message_spenders.insert(*nonce, tx.id()); } - Input::Contract { .. } => {} + Input::Contract(ContractInput { contract_id, .. }) => { + contract_users + .entry(*contract_id) + .or_default() + .push(tx.id()); + } } } for output in tx.outputs() { @@ -152,6 +164,26 @@ impl BasicCollisionManager { "Some contract creators are missing from the collision manager: {:?}", contracts_creators ); + for (contract_id, users) in &self.contract_users { + let expected = contract_users.remove(contract_id).unwrap_or_else(|| panic!( + "A contract ({}) user list is present on the collision manager that shouldn't be there.", + contract_id + )); + let mut actual_sorted = users.clone(); + actual_sorted.sort(); + let mut expected_sorted = expected; + expected_sorted.sort(); + assert_eq!( + actual_sorted, expected_sorted, + "contract_users mismatch for contract {}", + contract_id + ); + } + assert!( + contract_users.is_empty(), + "Some contract users are missing from the collision manager: {:?}", + contract_users + ); } } @@ -174,6 +206,17 @@ where .collect() } + fn get_contract_users(&self, contract_id: &ContractId) -> Vec { + self.contract_users + .get(contract_id) + .cloned() + .unwrap_or_default() + } + + fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool { + self.contracts_creators.contains_key(contract_id) + } + fn find_collisions( &self, transaction: &PoolTransaction, @@ -248,6 +291,7 @@ where let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.insert(*blob_id, storage_id); } + let tx_id = store_entry.transaction.id(); for input in store_entry.transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) @@ -262,7 +306,12 @@ where // insert message self.messages_spenders.insert(*nonce, storage_id); } - _ => {} + Input::Contract(ContractInput { contract_id, .. }) => { + self.contract_users + .entry(*contract_id) + .or_default() + .push(tx_id); + } } } for output in store_entry.transaction.outputs().iter() { @@ -284,6 +333,7 @@ where let blob_id = checked_tx.transaction().blob_id(); self.blobs_users.remove(blob_id); } + let tx_id = transaction.id(); for input in transaction.inputs() { match input { Input::CoinSigned(CoinSigned { utxo_id, .. }) @@ -298,7 +348,14 @@ where // remove message self.messages_spenders.remove(nonce); } - _ => {} + Input::Contract(ContractInput { contract_id, .. }) => { + if let Some(users) = self.contract_users.get_mut(contract_id) { + users.retain(|id| id != &tx_id); + if users.is_empty() { + self.contract_users.remove(contract_id); + } + } + } } } for output in transaction.outputs().iter() { diff --git a/crates/services/txpool_v2/src/collision_manager/mod.rs b/crates/services/txpool_v2/src/collision_manager/mod.rs index c6e75a45167..d053b70cad4 100644 --- a/crates/services/txpool_v2/src/collision_manager/mod.rs +++ b/crates/services/txpool_v2/src/collision_manager/mod.rs @@ -8,6 +8,8 @@ use fuel_core_types::{ }; use std::collections::HashMap; +use fuel_core_types::fuel_tx::ContractId; + use crate::storage::StorageData; pub mod basic; @@ -27,6 +29,16 @@ pub trait CollisionManager { /// Get spenders of coins UTXO created by a transaction ID. fn get_coins_spenders(&self, tx_creator_id: &TxId) -> Vec; + /// Get the IDs of pool transactions that have `Input::Contract(contract_id)`. + /// Used during preconfirmation rollback to evict pool txs that were admitted + /// only because a preconfirmed tx temporarily created the contract. + fn get_contract_users(&self, contract_id: &ContractId) -> Vec; + + /// Returns true if a currently in-pool transaction creates `contract_id`. + /// Used during rollback to skip eviction when the contract is still available + /// via a pool tx (independent of the rolled-back preconfirmation). + fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool; + /// Inform the collision manager that a transaction was stored. fn on_stored_transaction( &mut self, diff --git a/crates/services/txpool_v2/src/extracted_outputs.rs b/crates/services/txpool_v2/src/extracted_outputs.rs index ce0c7f3f28b..e1cd66974ec 100644 --- a/crates/services/txpool_v2/src/extracted_outputs.rs +++ b/crates/services/txpool_v2/src/extracted_outputs.rs @@ -49,7 +49,13 @@ impl ExtractedOutputs { for (utxo_id, output) in outputs { match output { Output::ContractCreated { contract_id, .. } => { - self.contract_created.insert(*contract_id, *utxo_id.tx_id()); + let tx_id = *utxo_id.tx_id(); + self.contract_created.insert(*contract_id, tx_id); + // Track the reverse mapping so cleanup via new_executed_transaction works. + self.contract_created_by_tx + .entry(tx_id) + .or_default() + .push(*contract_id); } Output::Coin { to, @@ -131,6 +137,16 @@ impl ExtractedOutputs { self.new_executed_transaction(tx_id); } + /// Returns the contract IDs created by `tx_id`, if any. + /// Call this **before** [`new_skipped_transaction`] / [`new_executed_transaction`] + /// if the caller needs the list for cleanup. + pub fn contracts_created_by(&self, tx_id: &TxId) -> &[ContractId] { + self.contract_created_by_tx + .get(tx_id) + .map(Vec::as_slice) + .unwrap_or(&[]) + } + pub fn new_executed_transaction(&mut self, tx_id: &TxId) { let contract_ids = self.contract_created_by_tx.remove(tx_id); if let Some(contract_ids) = contract_ids { diff --git a/crates/services/txpool_v2/src/lib.rs b/crates/services/txpool_v2/src/lib.rs index c27845af7b7..234dbbdf946 100644 --- a/crates/services/txpool_v2/src/lib.rs +++ b/crates/services/txpool_v2/src/lib.rs @@ -61,6 +61,9 @@ mod spent_inputs; mod tests; #[cfg(test)] fuel_core_trace::enable_tracing!(); +// Needed to activate the `test-helpers` feature flag for integration tests. +#[cfg(test)] +use fuel_core_txpool as _; use fuel_core_types::fuel_asm::Word; pub use pool::TxPoolStats; diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index 60568b3e2ce..4eafee20e7a 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -426,6 +426,55 @@ where self.tx_id_to_storage_id.keys() } + /// Process a preconfirmed transaction as committed while recording its spent + /// inputs so they can be rolled back later if the transaction is not + /// included in the canonical block. + pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) { + // If the tx was already extracted for local block production it is no + // longer in `tx_id_to_storage_id`, so the branch below that calls + // `record_tentative_spend` will be skipped. Preserve the input keys + // now, before `spend_inputs_by_tx_id` drains `spender_of_inputs[T]`. + if !self.tx_id_to_storage_id.contains_key(&tx_id) { + self.spent_inputs.move_spender_to_tentative(tx_id); + } + self.spent_inputs.spend_inputs_by_tx_id(tx_id); + if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) { + let dependents: Vec = + self.storage.get_direct_dependents(storage_id).collect(); + let Some(transaction) = self.storage.remove_transaction(storage_id) else { + // Invariant violation. Panic in tests, log in production. + debug_assert!(false, "Storage data not found for the transaction"); + tracing::warn!("Storage data not found for the transaction."); + return; + }; + self.extracted_outputs + .new_extracted_transaction(&transaction.transaction); + // Save the inputs before spending them permanently, so we can roll + // them back if the canonical block omits this transaction. + self.spent_inputs + .record_tentative_spend(tx_id, transaction.transaction.inputs()); + self.spent_inputs + .spend_inputs(tx_id, transaction.transaction.inputs()); + self.update_components_and_caches_on_removal(iter::once(&transaction)); + + let mut new_executable_transaction = false; + for dependent in dependents { + if !self.storage.has_dependencies(&dependent) + && let Some(storage_data) = self.storage.get(&dependent) + { + self.selection_algorithm + .new_executable_transaction(dependent, storage_data); + new_executable_transaction = true; + } + } + if new_executable_transaction { + self.new_executable_txs_notifier.send_replace(()); + } + } + + self.update_stats(); + } + /// Process committed transactions: /// - Remove transaction but keep its dependents and the dependents become executables. /// - Notify about possible new executable transactions. @@ -438,6 +487,7 @@ where self.storage.get_direct_dependents(storage_id).collect(); let Some(transaction) = self.storage.remove_transaction(storage_id) else { + // Invariant violation. Panic in tests, log in production. debug_assert!(false, "Storage data not found for the transaction"); tracing::warn!( "Storage data not found for the transaction during `remove_transaction`." @@ -461,6 +511,7 @@ where let mut new_executable_transaction = false; for promote in transactions_to_promote { let Some(storage_data) = self.storage.get(&promote) else { + // Invariant violation. Panic in tests, log in production. debug_assert!( false, "Dependent storage data not found for the transaction" @@ -568,13 +619,13 @@ where debug_assert!(!self.storage.has_dependencies(storage_id)); let Some(storage_data) = self.storage.get(storage_id) else { + // Invariant violation. Panic in tests, log in production. debug_assert!( false, "Storage data not found for one of the less worth transactions" ); tracing::warn!( - "Storage data not found for one of the less \ - worth transactions during `find_free_space`." + "Storage data not found for one of the less worth transactions" ); continue }; @@ -696,6 +747,87 @@ where self.update_stats(); } + /// Rollback a preconfirmed transaction that was not included in the canonical block. + /// + /// This clears the preconfirmation-derived outputs and removes any pool transactions + /// that depend on those outputs, since those inputs no longer exist on-chain. + pub fn rollback_preconfirmed_transaction(&mut self, tx_id: TxId) { + // Capture contracts created by this preconfirmation BEFORE clearing + // extracted_outputs, so we can evict any pool txs that were admitted + // only because of the now-stale temporary contract existence. + let created_contracts: Vec<_> = + self.extracted_outputs.contracts_created_by(&tx_id).to_vec(); + + // Remove preconfirmed outputs so dependents can't use them. + self.extracted_outputs.new_skipped_transaction(&tx_id); + // Allow the transaction itself to be re-submitted. + self.spent_inputs.unspend_preconfirmed(tx_id); + + let reason = format!( + "Preconfirmed parent transaction {tx_id} was not included in the canonical block" + ); + + // Remove any pool transactions that used the preconfirmed coin outputs as inputs. + let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id); + if !coin_dependents.is_empty() { + for dependent in coin_dependents { + let removed = self + .storage + .remove_transaction_and_dependents_subtree(dependent); + self.update_components_and_caches_on_removal(removed.iter()); + let removed_txs: Vec<_> = removed + .into_iter() + .map(|data| { + let dependent_tx_id = data.transaction.id(); + let tx_status = + statuses::SqueezedOut::new(reason.clone(), dependent_tx_id); + (dependent_tx_id, tx_status) + }) + .collect(); + if !removed_txs.is_empty() { + self.tx_status_manager.squeezed_out_txs(removed_txs); + } + } + } + + // Remove any pool transactions that used a preconfirmed contract creation + // as an input and were admitted only via `extracted_outputs` (no in-pool + // graph dependency). Skip eviction when the contract is still available + // through an independent in-pool creator. + for contract_id in created_contracts { + if self + .collision_manager + .contract_created_in_pool(&contract_id) + { + continue; + } + let user_tx_ids = self.collision_manager.get_contract_users(&contract_id); + for user_tx_id in user_tx_ids { + let Some(&storage_id) = self.tx_id_to_storage_id.get(&user_tx_id) else { + continue; + }; + let removed = self + .storage + .remove_transaction_and_dependents_subtree(storage_id); + self.update_components_and_caches_on_removal(removed.iter()); + let removed_txs: Vec<_> = removed + .into_iter() + .map(|data| { + let dependent_tx_id = data.transaction.id(); + let tx_status = + statuses::SqueezedOut::new(reason.clone(), dependent_tx_id); + (dependent_tx_id, tx_status) + }) + .collect(); + if !removed_txs.is_empty() { + self.tx_status_manager.squeezed_out_txs(removed_txs); + } + } + } + + self.update_stats(); + } + fn check_blob_does_not_exist( tx: &PoolTransaction, persistent_storage: &impl TxPoolPersistentStorage, diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index ed64194bb9f..a1f6447656d 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -17,7 +17,10 @@ use fuel_core_types::{ }, }; use std::{ - iter, + collections::{ + BTreeMap, + HashSet, + }, ops::Deref, sync::Arc, time::SystemTime, @@ -94,6 +97,7 @@ impl PoolWorkerInterface { tx_pool: TxPool, view_provider: Arc>, limits: &ServiceChannelLimits, + initial_block_height: BlockHeight, ) -> Self where View: TxPoolPersistentStorage, @@ -137,6 +141,8 @@ impl PoolWorkerInterface { pending_pool: PendingPool::new(tx_pool.config.pending_pool_tx_ttl), pool: tx_pool, view_provider, + tentative_preconfs: BTreeMap::new(), + current_canonical_height: initial_block_height, }; tokio_runtime.block_on(async { @@ -271,6 +277,14 @@ pub(super) struct PoolWorker { pending_pool: PendingPool, view_provider: Arc>, notification_sender: Sender, + /// Tracks preconfirmed transaction IDs by their tentative block height. + /// Used to roll back stale preconfirmations when the canonical block at + /// that height does not include those transactions. + tentative_preconfs: BTreeMap>, + /// The height of the last canonical block imported by this node. + /// Used to discard late preconfirmations whose tentative block height + /// is already at or below the canonical tip. + current_canonical_height: BlockHeight, } impl PoolWorker @@ -488,9 +502,17 @@ where } fn process_block(&mut self, block_result: SharedImportResult) { - self.pool.process_committed_transactions( - block_result.tx_status.iter().map(|tx_status| tx_status.id), - ); + let block_height = *block_result.sealed_block.entity.header().height(); + self.current_canonical_height = self.current_canonical_height.max(block_height); + + let confirmed_tx_ids: HashSet = block_result + .tx_status + .iter() + .map(|tx_status| tx_status.id) + .collect(); + + self.pool + .process_committed_transactions(confirmed_tx_ids.iter().copied()); block_result.tx_status.iter().for_each(|tx_status| { self.pool @@ -498,6 +520,37 @@ where .new_executed_transaction(&tx_status.id); }); + // Reconcile tentative preconfirmations for all heights up to and including + // the imported block height. Any preconfirmed tx that is absent from the + // canonical block must have its state rolled back so those inputs/outputs + // do not remain stale in the mempool. + let stale_heights: Vec = self + .tentative_preconfs + .range(..=block_height) + .map(|(h, _)| *h) + .collect(); + + for height in stale_heights { + if let Some(tentative_txs) = self.tentative_preconfs.remove(&height) { + for tx_id in tentative_txs { + if confirmed_tx_ids.contains(&tx_id) { + // Tx was included in the canonical block — confirm the + // tentative spend record so it won't be rolled back. + self.pool.spent_inputs.confirm_tentative_spend(&tx_id); + } else { + tracing::debug!( + "Rolling back stale preconfirmation for tx {} \ + (tentative block {}, canonical block {})", + tx_id, + height, + block_height, + ); + self.pool.rollback_preconfirmed_transaction(tx_id); + } + } + } + } + let resolved_txs = self.pending_pool.new_known_txs( block_result .sealed_block @@ -529,20 +582,54 @@ where tx_id: TxId, status: PreConfirmationStatus, ) { - let outputs = match &status { + // Reject late preconfirmations whose tentative block height is already + // at or below the node's current canonical tip. Applying them would + // temporarily mark inputs as spent and admit dependents against outputs + // that may not be in any canonical block, creating a stale-acceptance + // window that is only unwound on the next block import. + let preconf_height = match &status { + PreConfirmationStatus::Success(s) => Some(s.tx_pointer.block_height()), + PreConfirmationStatus::Failure(s) => Some(s.tx_pointer.block_height()), + PreConfirmationStatus::SqueezedOut(_) => None, + }; + if let Some(h) = preconf_height + && h <= self.current_canonical_height + { + tracing::debug!( + "Ignoring late preconfirmation for tx {} at height {} \ + (current canonical height {})", + tx_id, + h, + self.current_canonical_height, + ); + return; + } + + let (outputs, block_height) = match &status { PreConfirmationStatus::Success(status) => { - self.pool.process_committed_transactions(iter::once(tx_id)); + self.pool.process_preconfirmed_committed_transaction(tx_id); + let height = status.tx_pointer.block_height(); if let Some(outputs) = &status.resolved_outputs { - outputs + (outputs, Some(height)) } else { + // Still track the height so block import can clean up spent_inputs. + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); return; } } PreConfirmationStatus::Failure(status) => { - self.pool.process_committed_transactions(iter::once(tx_id)); + self.pool.process_preconfirmed_committed_transaction(tx_id); + let height = status.tx_pointer.block_height(); if let Some(outputs) = &status.resolved_outputs { - outputs + (outputs, Some(height)) } else { + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); return; } } @@ -551,6 +638,16 @@ where return; } }; + + // Track the tentative block height so the preconfirmed outputs can be + // rolled back if the canonical block at that height omits this tx. + if let Some(height) = block_height { + self.tentative_preconfs + .entry(height) + .or_default() + .insert(tx_id); + } + // All of this can be useful in case that we didn't know about the transaction let resolved = self .pending_pool diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index eb466af937e..8dcc4c84912 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -803,8 +803,12 @@ where let (current_height_writer, current_height_reader) = unsafe { SeqLock::new(current_height) }; - let pool_worker = - PoolWorkerInterface::new(txpool, storage_provider, &service_channel_limits); + let pool_worker = PoolWorkerInterface::new( + txpool, + storage_provider, + &service_channel_limits, + current_height, + ); let shared_state = SharedState { request_read_sender: pool_worker.request_read_sender.clone(), diff --git a/crates/services/txpool_v2/src/spent_inputs.rs b/crates/services/txpool_v2/src/spent_inputs.rs index 4cc0c933c3d..66c556a7e83 100644 --- a/crates/services/txpool_v2/src/spent_inputs.rs +++ b/crates/services/txpool_v2/src/spent_inputs.rs @@ -27,6 +27,9 @@ pub struct SpentInputs { /// transaction spent it. Later, this information can be used to unspent /// or fully spend the input. spender_of_inputs: HashMap>, + /// Inputs permanently spent during preconfirmation processing, saved so + /// they can be rolled back if the preconfirmation turns out to be stale. + tentative_spent: HashMap>, } impl SpentInputs { @@ -34,6 +37,7 @@ impl SpentInputs { Self { spent_inputs: LruCache::new(capacity), spender_of_inputs: HashMap::new(), + tentative_spent: HashMap::new(), } } @@ -90,6 +94,21 @@ impl SpentInputs { } } + /// Transitions the inputs recorded by [`maybe_spend_inputs`] from the + /// `spender_of_inputs` map directly into `tentative_spent`, without + /// touching the live `spent_inputs` LRU cache. + /// + /// Call this **before** [`spend_inputs_by_tx_id`] when a preconfirmation + /// arrives for a tx that was already extracted for local block production + /// (i.e. absent from pool storage). This preserves the input keys for a + /// potential rollback via [`unspend_preconfirmed`] before + /// [`spend_inputs_by_tx_id`] drains `spender_of_inputs`. + pub fn move_spender_to_tentative(&mut self, tx_id: TxId) { + if let Some(keys) = self.spender_of_inputs.get(&tx_id) { + self.tentative_spent.insert(tx_id, keys.clone()); + } + } + /// If transaction is skipped during the block production, this functions /// can be used to unspend inputs, allowing other transactions to spend them. pub fn unspend_inputs(&mut self, tx_id: TxId) { @@ -114,6 +133,43 @@ impl SpentInputs { pub fn is_spent_tx(&self, tx: &TxId) -> bool { self.spent_inputs.contains(&InputKey::Tx(*tx)) } + + /// Record inputs that were permanently spent during preconfirmation processing. + /// The saved keys can later be rolled back via [`unspend_preconfirmed`]. + pub fn record_tentative_spend(&mut self, tx_id: TxId, inputs: &[Input]) { + let keys: Vec = inputs + .iter() + .filter_map(|input| { + if input.is_coin() { + input.utxo_id().cloned().map(InputKey::Utxo) + } else if input.is_message() { + input.nonce().cloned().map(InputKey::Message) + } else { + None + } + }) + .collect(); + self.tentative_spent.insert(tx_id, keys); + } + + /// Remove the tentative-spend record for a confirmed transaction, preventing + /// a spurious rollback. Called when the preconfirmed tx is included in the + /// canonical block. + pub fn confirm_tentative_spend(&mut self, tx_id: &TxId) { + self.tentative_spent.remove(tx_id); + } + + /// Removes the tx entry and any individually-tracked UTXO/message inputs + /// from spent inputs, allowing the same inputs to be re-used. + /// Used when rolling back a stale preconfirmation. + pub fn unspend_preconfirmed(&mut self, tx_id: TxId) { + self.spent_inputs.pop(&InputKey::Tx(tx_id)); + if let Some(saved_keys) = self.tentative_spent.remove(&tx_id) { + for key in saved_keys { + self.spent_inputs.pop(&key); + } + } + } } #[cfg(test)] diff --git a/crates/services/txpool_v2/src/tests/mocks.rs b/crates/services/txpool_v2/src/tests/mocks.rs index 94d1447bce2..ea119abd4d8 100644 --- a/crates/services/txpool_v2/src/tests/mocks.rs +++ b/crates/services/txpool_v2/src/tests/mocks.rs @@ -113,6 +113,11 @@ impl MockTxStatusManager { tx, } } + + /// Send a preconfirmation update to the pool worker. + pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) { + let _ = self.tx_preconfirmations_update_sender.send((tx_id, status)); + } } impl ports::TxStatusManager for MockTxStatusManager { diff --git a/crates/services/txpool_v2/src/tests/mod.rs b/crates/services/txpool_v2/src/tests/mod.rs index 4578a2a50bc..cdcb9f78ecd 100644 --- a/crates/services/txpool_v2/src/tests/mod.rs +++ b/crates/services/txpool_v2/src/tests/mod.rs @@ -5,6 +5,7 @@ mod stability_test; mod tests_p2p; mod tests_pending_pool; mod tests_pool; +mod tests_preconf_rollback; mod tests_service; mod tx_status_manager_integration; mod universe; diff --git a/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs new file mode 100644 index 00000000000..fa46ce799d9 --- /dev/null +++ b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs @@ -0,0 +1,565 @@ +//! When a block producer emits preconfirmations and then crashes or produces a +//! different block at the same height, the sentry/RPC mempool must purge the +//! stale preconfirmation state on the next canonical block import. + +use std::sync::Arc; + +use fuel_core_types::{ + blockchain::{ + block::Block, + consensus::Sealed, + }, + fuel_tx::{ + Contract, + Output, + TxPointer, + UniqueIdentifier, + UtxoId, + }, + fuel_types::{ + BlockHeight, + ContractId, + }, + services::{ + block_importer::ImportResult, + executor::{ + TransactionExecutionResult, + TransactionExecutionStatus, + }, + transaction_status::{ + PreConfirmationStatus, + statuses, + }, + }, +}; + +use fuel_core_services::Service as ServiceTrait; + +use crate::{ + Constraints, + tests::{ + mocks::MockImporter, + universe::{ + TestPoolUniverse, + create_contract_input, + }, + }, +}; + +/// Build a canonical block sealed at `height` that contains `tx_ids`. +fn make_block_import( + height: u32, + tx_ids: &[fuel_core_types::fuel_tx::TxId], +) -> Arc< + dyn std::ops::Deref + + Send + + Sync, +> { + let sealed_block = Sealed { + entity: { + let mut block = Block::default(); + block + .header_mut() + .set_block_height(BlockHeight::new(height)); + block + }, + consensus: Default::default(), + }; + let tx_statuses = tx_ids + .iter() + .map(|id| TransactionExecutionStatus { + id: *id, + result: TransactionExecutionResult::Success { + result: None, + receipts: Arc::new(vec![]), + total_gas: 0, + total_fee: 0, + }, + }) + .collect(); + Arc::new(ImportResult::new_from_local(sealed_block, tx_statuses, vec![]).wrap()) +} + +/// Build a `PreConfirmationStatus::Success` that carries one coin output. +fn make_preconf_success( + tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, + output: Output, +) -> PreConfirmationStatus { + let utxo_id = UtxoId::new(tx_id, 0); + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: Some(vec![(utxo_id, output)]), + } + .into(), + ) +} + +/// Build a `PreConfirmationStatus::Success` whose resolved outputs include a +/// `ContractCreated` entry, so that `extracted_outputs.contract_exists(contract_id)` +/// becomes true after the preconfirmation is processed. +fn make_preconf_with_contract_created( + tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, + contract_id: ContractId, +) -> PreConfirmationStatus { + let utxo_id = UtxoId::new(tx_id, 0); + let output = Output::ContractCreated { + contract_id, + state_root: Contract::default_state_root(), + }; + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: Some(vec![(utxo_id, output)]), + } + .into(), + ) +} + +/// Build a `PreConfirmationStatus::Success` with no resolved outputs. +fn make_preconf_success_no_outputs( + _tx_id: fuel_core_types::fuel_tx::TxId, + block_height: u32, +) -> PreConfirmationStatus { + PreConfirmationStatus::Success( + statuses::PreConfirmationSuccess { + tx_pointer: TxPointer::new(BlockHeight::new(block_height), 0), + total_gas: 0, + total_fee: 0, + receipts: None, + resolved_outputs: None, + } + .into(), + ) +} + +/// After a preconfirmation arrives for tx T at height H, and then a canonical +/// block at height H is imported *without* T, the tx should no longer be +/// marked as "spent" i.e. it can be re-inserted into the pool. +#[tokio::test] +async fn preconfirmed_tx_can_be_reinserted_after_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Insert and wait for submitted status. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Simulate the block producer preconfirming tx at block height 1. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + // Give the pool worker time to process the preconfirmation. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // The tx should not be in the pool any more (committed). + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx should have been committed out of pool" + ); + + // When — import an empty block at height 1 (no tx T). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Give the worker time to process the block. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — re-inserting the same tx should now succeed because + // spent_inputs was rolled back. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + service.stop_and_await().await.unwrap(); +} + +/// When a preconfirmed tx's outputs are used by a dependent tx D, and the +/// canonical block does not include the preconfirmed tx, D must be removed +/// from the pool. +#[tokio::test] +async fn dependents_of_preconfirmed_tx_removed_on_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + // tx_parent is the tx that will be preconfirmed but not included. + // It produces output_a (a coin). + let (output_a, unset_input_a) = universe.create_output_and_input(); + let tx_parent = universe.build_script_transaction(None, Some(vec![output_a]), 1); + let tx_parent_id = tx_parent.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Simulate receiving a preconfirmation for tx_parent (which the sentry may + // never have seen). The preconf carries output_a. + universe.send_preconfirmation( + tx_parent_id, + make_preconf_success(tx_parent_id, 1, output_a), + ); + + // Give the worker time to process the preconfirmation and register outputs. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Now insert tx_child that spends output_a (utxo from tx_parent). + let input_a = unset_input_a.into_input(UtxoId::new(tx_parent_id, 0)); + let tx_child = universe.build_script_transaction(Some(vec![input_a]), None, 2); + let tx_child_id = tx_child.id(&Default::default()); + + service.shared.insert(tx_child.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_child_id]) + .await; + + // Sanity: child is in the pool. + let found = service.shared.find(vec![tx_child_id]).await.unwrap(); + assert!(found[0].is_some(), "tx_child should be in pool"); + + // When — import a block at height 1 that does NOT contain tx_parent. + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Then — tx_child depends on a now-stale preconfirmed output; it must be + // squeezed out. + universe + .await_expected_tx_statuses(vec![tx_child_id], |_, status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + let found = service.shared.find(vec![tx_child_id]).await.unwrap(); + assert!(found[0].is_none(), "tx_child should have been removed"); + + service.stop_and_await().await.unwrap(); +} + +/// When a preconfirmed tx IS included in the canonical block, its state must +/// be committed normally — no spurious rollback. +#[tokio::test] +async fn preconfirmed_tx_committed_normally_when_in_canonical_block() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Preconfirmation at height 1. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // When — import a block at height 1 that CONTAINS the tx. + block_sender + .send(make_block_import(1, &[tx_id])) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — tx must not reappear in the pool; re-inserting it should fail + // because its inputs are now permanently spent (committed in the block). + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx should not be in pool after block commit" + ); + + service.stop_and_await().await.unwrap(); +} + +/// Regression test for the "extracted-first" rollback bug. +/// +/// Sequence: +/// 1. Tx T is inserted and then extracted for local block production. +/// `maybe_spend_inputs` records T's coin inputs in `spender_of_inputs`. +/// 2. A preconfirmation for T arrives. Because T was already removed from +/// `tx_id_to_storage_id`, `process_preconfirmed_committed_transaction` +/// takes the `else` branch and must still save T's inputs into +/// `tentative_spent` so they can be rolled back later. +/// 3. The canonical block omits T → `rollback_preconfirmed_transaction` must +/// clear those coin-input keys from `spent_inputs`. +/// 4. Re-inserting T must succeed (inputs no longer marked spent). +#[tokio::test] +async fn extracted_tx_inputs_freed_after_preconf_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Insert and wait for the tx to be accepted. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // Extract the tx (simulating local block production). + // This calls `maybe_spend_inputs` and removes the tx from storage. + let extracted = service + .shared + .extract_transactions_for_block(Constraints { + minimal_gas_price: 0, + max_gas: u64::MAX, + maximum_txs: u16::MAX, + maximum_block_size: u32::MAX, + excluded_contracts: Default::default(), + }) + .unwrap(); + assert_eq!(extracted.len(), 1, "expected exactly one extracted tx"); + + // Preconfirmation arrives for the already-extracted tx. + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // When — import an empty canonical block at height 1 (tx is absent). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — coin inputs must have been freed; re-inserting T must succeed. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + service.stop_and_await().await.unwrap(); +} + +/// Stale preconfirmations at an older height are cleaned up when a later +/// block is imported, even if the heights don't match exactly. +#[tokio::test] +async fn stale_preconfs_at_older_height_cleaned_up_by_later_block() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + let (output_a, unset_input_a) = universe.create_output_and_input(); + let tx_parent = universe.build_script_transaction(None, Some(vec![output_a]), 1); + let tx_parent_id = tx_parent.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Preconfirmation at height 1 (but the block producer crashes). + universe.send_preconfirmation( + tx_parent_id, + make_preconf_success(tx_parent_id, 1, output_a), + ); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert a dependent tx while the preconf outputs are "live". + let input_a = unset_input_a.into_input(UtxoId::new(tx_parent_id, 0)); + let tx_child = universe.build_script_transaction(Some(vec![input_a]), None, 2); + let tx_child_id = tx_child.id(&Default::default()); + + service.shared.insert(tx_child).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_child_id]) + .await; + + // When — a block at height 2 arrives (skipping height 1). The preconf for + // height 1 was never resolved, so it must be rolled back. + block_sender.send(make_block_import(2, &[])).await.unwrap(); + + // Then — tx_child (which depended on the stale preconf output) is removed. + universe + .await_expected_tx_statuses(vec![tx_child_id], |_, status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + service.stop_and_await().await.unwrap(); +} + +/// A late preconfirmation arriving after the referenced canonical block has +/// already been imported must be silently discarded. +/// +/// Sequence: +/// 1. Node imports canonical block at height H (tx T is absent from it). +/// 2. T is inserted into the pool. +/// 3. A delayed preconfirmation for T arrives claiming height H. +/// 4. Because H <= current_canonical_height the preconfirmation is ignored: +/// T must remain in the pool (not committed out) and spent_inputs must +/// not be mutated. +#[tokio::test] +async fn late_preconf_below_canonical_height_is_ignored() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + let tx = universe.build_script_transaction(None, None, 10); + let tx_id = tx.id(&Default::default()); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Import a canonical block at height 1 that does NOT include the tx. + // After this the node's canonical height is 1. + block_sender.send(make_block_import(1, &[])).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert T after the block is imported. + service.shared.insert(tx.clone()).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_id]) + .await; + + // When — a delayed preconfirmation for T arrives at height 1 + // (already at or below the canonical tip). + universe.send_preconfirmation(tx_id, make_preconf_success_no_outputs(tx_id, 1)); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Then — the preconfirmation must be ignored. + // If it were applied, `process_preconfirmed_committed_transaction` would + // remove T from the pool. T must still be present. + let found = service.shared.find(vec![tx_id]).await.unwrap(); + assert!( + found[0].is_some(), + "tx should still be in pool — late preconfirmation must have been ignored" + ); + + service.stop_and_await().await.unwrap(); +} + +/// Regression test: a pool tx admitted via a preconfirmed contract creation must +/// be evicted when the canonical block omits the contract-creating preconfirmation. +/// +/// Sequence: +/// 1. Preconfirmation for tx P arrives carrying `Output::ContractCreated { C }`. +/// `extracted_outputs` now reports `contract_exists(C) == true`. +/// 2. Tx D with `Input::Contract(C)` is inserted into the pool. +/// It passes validation because `extracted_outputs.contract_exists(C)`. +/// D has no graph dependency on any in-pool creator of C. +/// 3. An empty canonical block is imported at height 1 (P is absent). +/// Rollback clears C from `extracted_outputs`. +/// 4. D must be squeezed out — it was only valid because of the now-stale +/// preconfirmed contract creation. +#[tokio::test] +async fn contract_dependent_tx_removed_on_preconf_rollback() { + // Given + let (block_sender, block_receiver) = tokio::sync::mpsc::channel(10); + let mut universe = TestPoolUniverse::default(); + + // Compute the contract id that P will advertise as created. + // P is never inserted into the pool — it arrives only via preconfirmation. + let contract_code = vec![1u8, 2, 3]; + let contract: fuel_core_types::fuel_tx::Contract = contract_code.into(); + let contract_id = fuel_core_types::fuel_tx::Contract::id( + &Default::default(), + &contract.root(), + &Default::default(), + ); + let p_tx_id = fuel_core_types::fuel_tx::TxId::from([0xABu8; 32]); + + let service = universe.build_service( + None, + Some(MockImporter::with_block_provider(block_receiver)), + ); + service.start_and_await().await.unwrap(); + + // Preconfirmation for P arrives (with ContractCreated output). + universe.send_preconfirmation( + p_tx_id, + make_preconf_with_contract_created(p_tx_id, 1, contract_id), + ); + + // Give the worker time to register the preconfirmed contract in extracted_outputs. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Insert D which uses the preconfirmed contract as an input. + // The mock DB has no record of this contract, so admission relies solely on + // extracted_outputs.contract_exists(contract_id). + // A script tx with Input::Contract at index 0 also needs Output::Contract(0). + let contract_input = create_contract_input(p_tx_id, 0, contract_id); + let contract_output = Output::contract(0, Default::default(), Default::default()); + let tx_d = universe.build_script_transaction( + Some(vec![contract_input]), + Some(vec![contract_output]), + 5, + ); + let tx_d_id = tx_d.id(&Default::default()); + + service.shared.insert(tx_d).await.unwrap(); + universe + .await_expected_tx_statuses_submitted(vec![tx_d_id]) + .await; + + let found = service.shared.find(vec![tx_d_id]).await.unwrap(); + assert!(found[0].is_some(), "tx_d should be in the pool"); + + // When — import an empty canonical block at height 1 (P was never included). + block_sender.send(make_block_import(1, &[])).await.unwrap(); + + // Then — D must be squeezed out because the contract it relied on never landed. + universe + .await_expected_tx_statuses(vec![tx_d_id], |_, status| { + matches!( + status, + fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) + ) + }) + .await + .unwrap(); + + let found = service.shared.find(vec![tx_d_id]).await.unwrap(); + assert!( + found[0].is_none(), + "tx_d should have been removed from the pool" + ); + + service.stop_and_await().await.unwrap(); +} diff --git a/crates/services/txpool_v2/src/tests/universe.rs b/crates/services/txpool_v2/src/tests/universe.rs index c1a7603aed2..d3039c26073 100644 --- a/crates/services/txpool_v2/src/tests/universe.rs +++ b/crates/services/txpool_v2/src/tests/universe.rs @@ -6,6 +6,8 @@ use std::{ sync::Arc, }; +use fuel_core_types::services::transaction_status::PreConfirmationStatus; + use crate::{ GasPrice, Service, @@ -453,6 +455,12 @@ impl TestPoolUniverse { self.pool.clone().unwrap() } + /// Send a preconfirmation update directly to the pool worker. + pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) { + self.mock_tx_status_manager + .send_preconfirmation(tx_id, status); + } + pub fn setup_coin(&mut self) -> (Coin, Input) { let input = self.random_predicate(AssetId::BASE, TEST_COIN_AMOUNT, None); From 2ef3dfb2e0f683157cb7e5b1dfe656a0eaf621f3 Mon Sep 17 00:00:00 2001 From: Hannes Karppila Date: Fri, 17 Apr 2026 15:25:19 +0300 Subject: [PATCH 2/7] Bump versions --- Cargo.lock | 136 +++++++++--------- Cargo.toml | 62 ++++---- .../upgradable-executor/src/executor.rs | 4 +- 3 files changed, 101 insertions(+), 101 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d13789b3ef2..e00419b6d15 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3529,7 +3529,7 @@ dependencies = [ [[package]] name = "fuel-core" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "assert_matches", @@ -3563,7 +3563,7 @@ dependencies = [ "fuel-core-trace", "fuel-core-tx-status-manager", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-upgradable-executor", "futures", "hex", @@ -3620,7 +3620,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-sync", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "itertools 0.12.1", @@ -3643,11 +3643,11 @@ dependencies = [ [[package]] name = "fuel-core-bft" -version = "0.47.3" +version = "0.47.4" [[package]] name = "fuel-core-bin" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "aws-config", @@ -3662,7 +3662,7 @@ dependencies = [ "fuel-core-poa", "fuel-core-shared-sequencer", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "hex", "humantime", "itertools 0.12.1", @@ -3685,7 +3685,7 @@ dependencies = [ [[package]] name = "fuel-core-block-aggregator-api" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3693,7 +3693,7 @@ dependencies = [ "enum-iterator", "fuel-core-services", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "num_enum", "postcard", @@ -3710,7 +3710,7 @@ dependencies = [ [[package]] name = "fuel-core-chain-config" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "bech32", @@ -3718,7 +3718,7 @@ dependencies = [ "educe", "fuel-core-chain-config", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "insta", "itertools 0.12.1", "parquet", @@ -3736,7 +3736,7 @@ dependencies = [ [[package]] name = "fuel-core-chaos-test" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "clap", @@ -3744,7 +3744,7 @@ dependencies = [ "fuel-core-chain-config", "fuel-core-poa", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "humantime", "rand 0.8.5", @@ -3758,14 +3758,14 @@ dependencies = [ [[package]] name = "fuel-core-client" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "base64 0.22.1", "cynic", "derive_more 0.99.20", "eventsource-client", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "hyper-rustls 0.24.2", @@ -3783,23 +3783,23 @@ dependencies = [ [[package]] name = "fuel-core-client-bin" -version = "0.47.3" +version = "0.47.4" dependencies = [ "clap", "fuel-core-client", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "serde_json", "tokio", ] [[package]] name = "fuel-core-compression" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "enum_dispatch", "fuel-core-compression", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "paste", "postcard", "proptest", @@ -3812,7 +3812,7 @@ dependencies = [ [[package]] name = "fuel-core-compression-service" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3823,7 +3823,7 @@ dependencies = [ "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "paste", "rand 0.8.5", @@ -3838,30 +3838,30 @@ dependencies = [ [[package]] name = "fuel-core-consensus-module" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-chain-config", "fuel-core-poa", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "test-case", ] [[package]] name = "fuel-core-database" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", ] [[package]] name = "fuel-core-e2e-client" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "assert_cmd", @@ -3869,7 +3869,7 @@ dependencies = [ "fuel-core-chain-config", "fuel-core-client", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "humantime-serde", @@ -3887,13 +3887,13 @@ dependencies = [ [[package]] name = "fuel-core-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-storage", "fuel-core-syscall", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "parking_lot", "serde", "sha2 0.10.9", @@ -3902,7 +3902,7 @@ dependencies = [ [[package]] name = "fuel-core-gas-price-service" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3910,7 +3910,7 @@ dependencies = [ "fuel-core-metrics", "fuel-core-services", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-gas-price-algorithm", "futures", "mockito", @@ -3930,14 +3930,14 @@ dependencies = [ [[package]] name = "fuel-core-importer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "fuel-core-metrics", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "mockall", "rayon", "test-case", @@ -3947,18 +3947,18 @@ dependencies = [ [[package]] name = "fuel-core-keygen" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "clap", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "libp2p-identity", "serde", ] [[package]] name = "fuel-core-keygen-bin" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "atty", @@ -3971,7 +3971,7 @@ dependencies = [ [[package]] name = "fuel-core-metrics" -version = "0.47.3" +version = "0.47.4" dependencies = [ "once_cell", "parking_lot", @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "fuel-core-p2p" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -3995,7 +3995,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "hex", "hickory-resolver", @@ -4019,12 +4019,12 @@ dependencies = [ [[package]] name = "fuel-core-parallel-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-executor", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-upgradable-executor", "futures", "rand 0.8.5", @@ -4033,7 +4033,7 @@ dependencies = [ [[package]] name = "fuel-core-poa" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4041,7 +4041,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "mockall", "rand 0.8.5", "serde", @@ -4055,7 +4055,7 @@ dependencies = [ [[package]] name = "fuel-core-producer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4063,7 +4063,7 @@ dependencies = [ "fuel-core-producer", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "mockall", "proptest", "rand 0.8.5", @@ -4074,7 +4074,7 @@ dependencies = [ [[package]] name = "fuel-core-relayer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4088,7 +4088,7 @@ dependencies = [ "fuel-core-services", "fuel-core-storage", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "mockall", "once_cell", @@ -4106,7 +4106,7 @@ dependencies = [ [[package]] name = "fuel-core-services" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4122,7 +4122,7 @@ dependencies = [ [[package]] name = "fuel-core-shared-sequencer" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4130,7 +4130,7 @@ dependencies = [ "cosmos-sdk-proto", "cosmrs", "fuel-core-services", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-sequencer-proto", "futures", "postcard", @@ -4146,13 +4146,13 @@ dependencies = [ [[package]] name = "fuel-core-storage" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "enum-iterator", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-vm 0.65.0", "impl-tools", "itertools 0.12.1", @@ -4170,13 +4170,13 @@ dependencies = [ [[package]] name = "fuel-core-sync" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", "fuel-core-services", "fuel-core-trace", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "mockall", "rand 0.8.5", @@ -4189,9 +4189,9 @@ dependencies = [ [[package]] name = "fuel-core-syscall" -version = "0.47.3" +version = "0.47.4" dependencies = [ - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "parking_lot", "tracing", ] @@ -4221,7 +4221,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-trace", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-upgradable-executor", "futures", "hex", @@ -4250,7 +4250,7 @@ dependencies = [ [[package]] name = "fuel-core-trace" -version = "0.47.3" +version = "0.47.4" dependencies = [ "ctor", "fork", @@ -4266,13 +4266,13 @@ dependencies = [ [[package]] name = "fuel-core-tx-status-manager" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", "fuel-core-metrics", "fuel-core-services", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "mockall", "parking_lot", @@ -4287,7 +4287,7 @@ dependencies = [ [[package]] name = "fuel-core-txpool" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "async-trait", @@ -4298,7 +4298,7 @@ dependencies = [ "fuel-core-syscall", "fuel-core-trace", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "lru 0.13.0", "mockall", @@ -4329,7 +4329,7 @@ dependencies = [ [[package]] name = "fuel-core-types" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "aws-config", @@ -4353,13 +4353,13 @@ dependencies = [ [[package]] name = "fuel-core-upgradable-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "derive_more 0.99.20", "fuel-core-executor", "fuel-core-storage", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "fuel-core-wasm-executor", "futures", "parking_lot", @@ -4370,13 +4370,13 @@ dependencies = [ [[package]] name = "fuel-core-wasm-executor" -version = "0.47.3" +version = "0.47.4" dependencies = [ "anyhow", "fuel-core-executor", "fuel-core-storage", "fuel-core-types 0.35.0", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "postcard", "proptest", @@ -4447,7 +4447,7 @@ dependencies = [ [[package]] name = "fuel-gas-price-algorithm" -version = "0.47.3" +version = "0.47.4" dependencies = [ "proptest", "rand 0.8.5", @@ -10102,7 +10102,7 @@ dependencies = [ "fuel-core-storage", "fuel-core-trace", "fuel-core-txpool", - "fuel-core-types 0.47.3", + "fuel-core-types 0.47.4", "futures", "itertools 0.12.1", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 8f99ba359ef..5f73e704dd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ keywords = ["blockchain", "cryptocurrencies", "fuel-vm", "vm"] license = "BUSL-1.1" repository = "https://github.com/FuelLabs/fuel-core" rust-version = "1.90.0" -version = "0.47.3" +version = "0.47.4" [workspace.dependencies] @@ -80,36 +80,36 @@ educe = { version = "0.6", default-features = false, features = ["Eq", "PartialE enum-iterator = "1.2" enum_dispatch = "0.3.13" # Workspace members -fuel-core = { version = "0.47.3", path = "./crates/fuel-core", default-features = false } -fuel-core-bin = { version = "0.47.3", path = "./bin/fuel-core" } -fuel-core-chain-config = { version = "0.47.3", path = "./crates/chain-config", default-features = false } -fuel-core-client = { version = "0.47.3", path = "./crates/client" } -fuel-core-compression = { version = "0.47.3", path = "./crates/compression" } -fuel-core-compression-service = { version = "0.47.3", path = "./crates/services/compression" } -fuel-core-consensus-module = { version = "0.47.3", path = "./crates/services/consensus_module" } -fuel-core-database = { version = "0.47.3", path = "./crates/database" } -fuel-core-executor = { version = "0.47.3", path = "./crates/services/executor", default-features = false } -fuel-core-gas-price-service = { version = "0.47.3", path = "crates/services/gas_price_service" } -fuel-core-importer = { version = "0.47.3", path = "./crates/services/importer" } -fuel-core-keygen = { version = "0.47.3", path = "./crates/keygen" } -fuel-core-metrics = { version = "0.47.3", path = "./crates/metrics" } -fuel-core-p2p = { version = "0.47.3", path = "./crates/services/p2p" } -fuel-core-parallel-executor = { version = "0.47.3", path = "./crates/services/parallel-executor" } -fuel-core-poa = { version = "0.47.3", path = "./crates/services/consensus_module/poa" } -fuel-core-producer = { version = "0.47.3", path = "./crates/services/producer" } -fuel-core-relayer = { version = "0.47.3", path = "./crates/services/relayer" } -fuel-core-services = { version = "0.47.3", path = "./crates/services" } -fuel-core-shared-sequencer = { version = "0.47.3", path = "crates/services/shared-sequencer" } -fuel-core-storage = { version = "0.47.3", path = "./crates/storage", default-features = false } -fuel-core-sync = { version = "0.47.3", path = "./crates/services/sync" } -fuel-core-syscall = { version = "0.47.3", path = "./crates/syscall", default-features = false } -fuel-core-trace = { version = "0.47.3", path = "./crates/trace" } -fuel-core-tx-status-manager = { version = "0.47.3", path = "./crates/services/tx_status_manager" } -fuel-core-txpool = { version = "0.47.3", path = "./crates/services/txpool_v2" } -fuel-core-types = { version = "0.47.3", path = "./crates/types", default-features = false } -fuel-core-upgradable-executor = { version = "0.47.3", path = "./crates/services/upgradable-executor" } -fuel-core-wasm-executor = { version = "0.47.3", path = "./crates/services/upgradable-executor/wasm-executor", default-features = false } -fuel-gas-price-algorithm = { version = "0.47.3", path = "crates/fuel-gas-price-algorithm" } +fuel-core = { version = "0.47.4", path = "./crates/fuel-core", default-features = false } +fuel-core-bin = { version = "0.47.4", path = "./bin/fuel-core" } +fuel-core-chain-config = { version = "0.47.4", path = "./crates/chain-config", default-features = false } +fuel-core-client = { version = "0.47.4", path = "./crates/client" } +fuel-core-compression = { version = "0.47.4", path = "./crates/compression" } +fuel-core-compression-service = { version = "0.47.4", path = "./crates/services/compression" } +fuel-core-consensus-module = { version = "0.47.4", path = "./crates/services/consensus_module" } +fuel-core-database = { version = "0.47.4", path = "./crates/database" } +fuel-core-executor = { version = "0.47.4", path = "./crates/services/executor", default-features = false } +fuel-core-gas-price-service = { version = "0.47.4", path = "crates/services/gas_price_service" } +fuel-core-importer = { version = "0.47.4", path = "./crates/services/importer" } +fuel-core-keygen = { version = "0.47.4", path = "./crates/keygen" } +fuel-core-metrics = { version = "0.47.4", path = "./crates/metrics" } +fuel-core-p2p = { version = "0.47.4", path = "./crates/services/p2p" } +fuel-core-parallel-executor = { version = "0.47.4", path = "./crates/services/parallel-executor" } +fuel-core-poa = { version = "0.47.4", path = "./crates/services/consensus_module/poa" } +fuel-core-producer = { version = "0.47.4", path = "./crates/services/producer" } +fuel-core-relayer = { version = "0.47.4", path = "./crates/services/relayer" } +fuel-core-services = { version = "0.47.4", path = "./crates/services" } +fuel-core-shared-sequencer = { version = "0.47.4", path = "crates/services/shared-sequencer" } +fuel-core-storage = { version = "0.47.4", path = "./crates/storage", default-features = false } +fuel-core-sync = { version = "0.47.4", path = "./crates/services/sync" } +fuel-core-syscall = { version = "0.47.4", path = "./crates/syscall", default-features = false } +fuel-core-trace = { version = "0.47.4", path = "./crates/trace" } +fuel-core-tx-status-manager = { version = "0.47.4", path = "./crates/services/tx_status_manager" } +fuel-core-txpool = { version = "0.47.4", path = "./crates/services/txpool_v2" } +fuel-core-types = { version = "0.47.4", path = "./crates/types", default-features = false } +fuel-core-upgradable-executor = { version = "0.47.4", path = "./crates/services/upgradable-executor" } +fuel-core-wasm-executor = { version = "0.47.4", path = "./crates/services/upgradable-executor/wasm-executor", default-features = false } +fuel-gas-price-algorithm = { version = "0.47.4", path = "crates/fuel-gas-price-algorithm" } # Fuel dependencies fuel-vm-private = { version = "0.65.0", package = "fuel-vm", default-features = false } diff --git a/crates/services/upgradable-executor/src/executor.rs b/crates/services/upgradable-executor/src/executor.rs index 4900386adce..a9340f1be62 100644 --- a/crates/services/upgradable-executor/src/executor.rs +++ b/crates/services/upgradable-executor/src/executor.rs @@ -262,8 +262,8 @@ impl Executor { ("0-45-1", 30), ("0-46-0", 31), // We are skipping 0-47-0 because it was not published. - // 0-47-3 has the same STF version as 0-47-1 - ("0-47-3", LATEST_STATE_TRANSITION_VERSION), + // 0-47-4 has the same STF version as 0-47-1 + ("0-47-4", LATEST_STATE_TRANSITION_VERSION), ]; pub fn new( From d0f7951a0ec59841b9986f54f818f7f261570c0b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 17 Apr 2026 12:33:19 +0000 Subject: [PATCH 3/7] Generate changelog for v.0.47.4 --- .changes/changed/3138.md | 1 - .changes/changed/3203.md | 1 - .changes/changed/3225.md | 1 - .changes/fixed/3124.md | 1 - .changes/fixed/3264.md | 1 - CHANGELOG.md | 11 +++++++++++ 6 files changed, 11 insertions(+), 5 deletions(-) delete mode 100644 .changes/changed/3138.md delete mode 100644 .changes/changed/3203.md delete mode 100644 .changes/changed/3225.md delete mode 100644 .changes/fixed/3124.md delete mode 100644 .changes/fixed/3264.md diff --git a/.changes/changed/3138.md b/.changes/changed/3138.md deleted file mode 100644 index 981c7d1d63a..00000000000 --- a/.changes/changed/3138.md +++ /dev/null @@ -1 +0,0 @@ -Migrate CI from BuildJet to WarpBuild runners, update GitHub Actions to latest versions, and use pre-built binaries for cargo-nextest and cargo-audit. \ No newline at end of file diff --git a/.changes/changed/3203.md b/.changes/changed/3203.md deleted file mode 100644 index b1c1afc2af0..00000000000 --- a/.changes/changed/3203.md +++ /dev/null @@ -1 +0,0 @@ -Add lease port for PoA adapter to allow multiple producers to be live but only one leader. \ No newline at end of file diff --git a/.changes/changed/3225.md b/.changes/changed/3225.md deleted file mode 100644 index 9b7e7cbb882..00000000000 --- a/.changes/changed/3225.md +++ /dev/null @@ -1 +0,0 @@ -PoA quorum and HA failover fixes: Redis leader lease adapter improvements, write_block.lua HEIGHT_EXISTS check, sub-quorum block repair, Prometheus metrics, and chaos test harness. diff --git a/.changes/fixed/3124.md b/.changes/fixed/3124.md deleted file mode 100644 index 88a33bab305..00000000000 --- a/.changes/fixed/3124.md +++ /dev/null @@ -1 +0,0 @@ -Using Debian Bookworm as the runtime base image for Docker builds. This is the same base image as the Rust builder images. Keeping the images in-sync will help prevent runtime dependency mismatch issues. diff --git a/.changes/fixed/3264.md b/.changes/fixed/3264.md deleted file mode 100644 index 30edc35c336..00000000000 --- a/.changes/fixed/3264.md +++ /dev/null @@ -1 +0,0 @@ -Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions. diff --git a/CHANGELOG.md b/CHANGELOG.md index a07a9f91c18..cf2df05c001 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased (see .changes folder)] +## [Version 0.47.4] + +### Changed +- [3138](https://github.com/FuelLabs/fuel-core/pull/3138): Migrate CI from BuildJet to WarpBuild runners, update GitHub Actions to latest versions, and use pre-built binaries for cargo-nextest and cargo-audit. +- [3203](https://github.com/FuelLabs/fuel-core/pull/3203): Add lease port for PoA adapter to allow multiple producers to be live but only one leader. +- [3225](https://github.com/FuelLabs/fuel-core/pull/3225): PoA quorum and HA failover fixes: Redis leader lease adapter improvements, write_block.lua HEIGHT_EXISTS check, sub-quorum block repair, Prometheus metrics, and chaos test harness. + +### Fixed +- [3124](https://github.com/FuelLabs/fuel-core/pull/3124): Using Debian Bookworm as the runtime base image for Docker builds. This is the same base image as the Rust builder images. Keeping the images in-sync will help prevent runtime dependency mismatch issues. +- [3264](https://github.com/FuelLabs/fuel-core/pull/3264): Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions. + ## [Version 0.47.1] ### Fixed From 45efc18397f5a266b9c4195d921dee524c01878d Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Fri, 17 Apr 2026 15:33:13 -0600 Subject: [PATCH 4/7] fix v0.47.4 CI (#3270) ## Linked Issues/PRs ## Description ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --- .cargo/audit.toml | 13 +++++++++++++ crates/services/txpool_v2/src/pool.rs | 10 ++++++---- .../txpool_v2/src/tests/tests_preconf_rollback.rs | 6 +++--- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 373fc7fea54..34123189dc2 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -7,4 +7,17 @@ ignore = [ "RUSTSEC-2026-0020", # wasmtime advisory, pending upgrade "RUSTSEC-2026-0021", # wasmtime advisory, pending upgrade "RUSTSEC-2026-0049", # temporary CI ignore; fix in master + "RUSTSEC-2026-0085", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0086", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0087", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0088", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0089", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0091", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0092", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0093", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0094", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0095", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0096", # wasmtime advisory, pending upgrade + "RUSTSEC-2026-0098", # rustls-webpki via legacy transitive dependencies + "RUSTSEC-2026-0099", # rustls-webpki via legacy transitive dependencies ] diff --git a/crates/services/txpool_v2/src/pool.rs b/crates/services/txpool_v2/src/pool.rs index 4eafee20e7a..66f67b3815d 100644 --- a/crates/services/txpool_v2/src/pool.rs +++ b/crates/services/txpool_v2/src/pool.rs @@ -779,8 +779,9 @@ where .into_iter() .map(|data| { let dependent_tx_id = data.transaction.id(); - let tx_status = - statuses::SqueezedOut::new(reason.clone(), dependent_tx_id); + let tx_status = statuses::SqueezedOut { + reason: reason.clone(), + }; (dependent_tx_id, tx_status) }) .collect(); @@ -814,8 +815,9 @@ where .into_iter() .map(|data| { let dependent_tx_id = data.transaction.id(); - let tx_status = - statuses::SqueezedOut::new(reason.clone(), dependent_tx_id); + let tx_status = statuses::SqueezedOut { + reason: reason.clone(), + }; (dependent_tx_id, tx_status) }) .collect(); diff --git a/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs index fa46ce799d9..9e730ef142b 100644 --- a/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs +++ b/crates/services/txpool_v2/src/tests/tests_preconf_rollback.rs @@ -244,7 +244,7 @@ async fn dependents_of_preconfirmed_tx_removed_on_rollback() { // Then — tx_child depends on a now-stale preconfirmed output; it must be // squeezed out. universe - .await_expected_tx_statuses(vec![tx_child_id], |_, status| { + .await_expected_tx_statuses(vec![tx_child_id], |status| { matches!( status, fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) @@ -411,7 +411,7 @@ async fn stale_preconfs_at_older_height_cleaned_up_by_later_block() { // Then — tx_child (which depended on the stale preconf output) is removed. universe - .await_expected_tx_statuses(vec![tx_child_id], |_, status| { + .await_expected_tx_statuses(vec![tx_child_id], |status| { matches!( status, fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) @@ -546,7 +546,7 @@ async fn contract_dependent_tx_removed_on_preconf_rollback() { // Then — D must be squeezed out because the contract it relied on never landed. universe - .await_expected_tx_statuses(vec![tx_d_id], |_, status| { + .await_expected_tx_statuses(vec![tx_d_id], |status| { matches!( status, fuel_core_types::services::transaction_status::TransactionStatus::SqueezedOut(_) From 4e4ea2b98b87dbdb14054f1b70cc57cc4fae2c9c Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Fri, 17 Apr 2026 15:33:26 -0600 Subject: [PATCH 5/7] =?UTF-8?q?(cherry-pick)=20fix:=20group=20reconciliati?= =?UTF-8?q?on=20votes=20by=20block=5Fid=20to=20resolve=20same-block=20dea?= =?UTF-8?q?=E2=80=A6=20(#3271)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …dlock (#3269) ## Summary - Fixes a PoA reconciliation deadlock observed on devnet 2026-04-17 where the same block ended up on all 6 Redis nodes with three different epochs, causing permanent livelock - `unreconciled_blocks` now groups votes by `block_id` only, tracking max epoch as a tiebreaker. Identical blocks written during re-promotion storms count toward quorum. - Added a regression test that reproduces the exact production error string ## The bug During re-promotion storms (two pods racing for leadership), the same block can be written to different Redis nodes with different epochs. The old vote grouping `(epoch, block_id)` fragmented these identical blocks into separate vote groups: ``` Node state (same block_id, different epoch stamps): 1a-0, 1a-1, 1b-1: epoch 268 → vote group A, count=3 1b-0: epoch 269 → vote group B, count=1 1c-0, 1c-1: epoch 270 → vote group C, count=2 ← max-epoch winner Required quorum: 4. Winner count: 2 → repair attempted. Repair writes the winner to all 6 nodes → HEIGHT_EXISTS on every node (each has SOME entry at that height) → Written=0 → total=2 < quorum. Permanent livelock. ``` ## The fix Group by `block_id` alone; track max epoch per block_id as the tiebreaker when block_ids genuinely differ: ```rust // Before HashMap::<(u64, BlockId), (usize, SealedBlock)> vote_key = (*epoch, block.entity.id()) winner = max_by_key(epoch) // After HashMap:: // (max_epoch, count, block) vote_key = block.entity.id() winner = max_by_key(max_epoch) ``` **Behavior change:** - Same block with multiple epochs → single vote group → counts as a single block on N nodes → reconciles directly without repair (this fixes the deadlock) - Genuinely different blocks at same height → picks higher-epoch block → same behavior as before ## Test plan - [x] New test `leader_state__when_same_block_has_different_epochs_across_nodes_then_reconciles_without_repair` reproduces the exact production error without the fix (`"Backlog unresolved at height 1: repair failed to reach quorum"`) and passes with it - [x] All 9 existing `leader_state__*` tests still pass - [ ] Deploy to devnet and verify the stuck authority recovers Please go to the `Preview` tab and select the appropriate sub-template: * [Classic PR](?expand=1&template=default.md) * [Bump version](?expand=1&template=bump_version.md) --------- Co-authored-by: Brandon Kite --- .changes/fixed/3271.md | 1 + .../service/adapters/consensus_module/poa.rs | 96 +++++++++++++++++-- 2 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 .changes/fixed/3271.md diff --git a/.changes/fixed/3271.md b/.changes/fixed/3271.md new file mode 100644 index 00000000000..b4d662ce787 --- /dev/null +++ b/.changes/fixed/3271.md @@ -0,0 +1 @@ +Fix PoA reconciliation deadlock when the same block exists on all Redis nodes but with different epochs. `unreconciled_blocks` now groups votes by `block_id` only (tracking max epoch as tiebreaker), so identical blocks written during re-promotion storms count toward quorum. diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 71438b52558..364263df3e5 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -797,20 +797,29 @@ impl RedisLeaderLeaseAdapter { break; } + // Group votes by block_id only (not epoch). The same block can + // be written to different nodes with different epochs during + // re-promotion storms — but if the block_id matches, it's the + // same block and all copies count toward quorum. We track the + // max epoch per block_id as the tiebreaker for fork resolution + // when block_ids genuinely differ. let votes = blocks_by_node .iter() .filter_map(|blocks_by_height| blocks_by_height.get(¤t_height)) .flat_map(|blocks_by_epoch| blocks_by_epoch.iter()) .fold( - HashMap::<(u64, BlockId), (usize, SealedBlock)>::new(), + HashMap::::new(), |mut votes, (epoch, block)| { - let vote_key = (*epoch, block.entity.id()); + let vote_key = block.entity.id(); match votes.get_mut(&vote_key) { - Some((count, _)) => { + Some((max_epoch, count, _)) => { *count = count.saturating_add(1); + if *epoch > *max_epoch { + *max_epoch = *epoch; + } } None => { - votes.insert(vote_key, (1, block.clone())); + votes.insert(vote_key, (*epoch, 1, block.clone())); } } votes @@ -819,8 +828,8 @@ impl RedisLeaderLeaseAdapter { let winner = votes .into_iter() - .max_by_key(|((epoch, _), _)| *epoch) - .map(|(_, (count, block))| (count, block)); + .max_by_key(|(_, (max_epoch, _, _))| *max_epoch) + .map(|(_, (_, count, block))| (count, block)); if let Some((count, block)) = winner { if self.quorum_reached(count) { @@ -1565,6 +1574,81 @@ mod tests { ); } + /// Reproduces the devnet deadlock from April 17, 2026. + /// + /// The same block was written to all 3 nodes during re-promotion storms, + /// so each node has the same block_id but with different epoch metadata. + /// The old `(epoch, block_id)` vote grouping fragmented these into + /// separate vote groups, with the max-epoch group having a count below + /// quorum. Repair then failed because every node returned HEIGHT_EXISTS. + /// + /// With the fix (grouping by block_id only), all copies of the same + /// block count toward quorum regardless of epoch metadata — so this + /// state resolves without repair. + #[tokio::test(flavor = "multi_thread")] + async fn leader_state__when_same_block_has_different_epochs_across_nodes_then_reconciles_without_repair() + { + // given: same block on all 3 nodes, but with different epochs + // (as happens when re-promotion writes race during production) + let redis_a = RedisTestServer::spawn(); + let redis_b = RedisTestServer::spawn(); + let redis_c = RedisTestServer::spawn(); + let lease_key = "poa:test:same-block-different-epochs".to_string(); + let stream_key = format!("{lease_key}:block:stream"); + let adapter = new_test_adapter( + vec![ + redis_a.redis_url(), + redis_b.redis_url(), + redis_c.redis_url(), + ], + lease_key, + ); + assert!( + adapter + .acquire_lease_if_free() + .await + .expect("acquire should succeed"), + "adapter should acquire lease" + ); + + // Same block (same data, same block_id) on all 3 nodes, but each + // with a different epoch. This simulates what happens when the + // original leader was re-promoted repeatedly during a race, + // writing the same block content each time with a bumped epoch. + let block = poa_block_at_time(1, 10); + let block_data = postcard::to_allocvec(&block).expect("should serialize"); + + let redis_a_client = + redis::Client::open(redis_a.redis_url()).expect("redis a client"); + let redis_b_client = + redis::Client::open(redis_b.redis_url()).expect("redis b client"); + let redis_c_client = + redis::Client::open(redis_c.redis_url()).expect("redis c client"); + let mut conn_a = redis_a_client.get_connection().expect("redis a conn"); + let mut conn_b = redis_b_client.get_connection().expect("redis b conn"); + let mut conn_c = redis_c_client.get_connection().expect("redis c conn"); + + // Same block_id, three different epochs + append_stream_block(&mut conn_a, &stream_key, 1, &block_data, 5); + append_stream_block(&mut conn_b, &stream_key, 1, &block_data, 7); + append_stream_block(&mut conn_c, &stream_key, 1, &block_data, 9); + + // when: leader reconciles + let leader_state = adapter + .leader_state(1.into()) + .await + .expect("leader_state should succeed"); + + // then: the block is reconciled directly (no repair needed). Without + // the fix, the old logic would have split the 3 copies into 3 vote + // groups and tried to repair the max-epoch group (count=1), which + // would deadlock because every node returns HEIGHT_EXISTS. + assert!( + matches!(leader_state, LeaderState::UnreconciledBlocks(ref blocks) if blocks.len() == 1), + "Expected block to be reconciled from quorum across mixed epochs, got {leader_state:?}" + ); + } + #[tokio::test(flavor = "multi_thread")] async fn leader_state__when_same_height_entry_exists_on_less_than_quorum_nodes_then_repairs_it() { From 5f8f2d7e472840d95f15a28cee88f07df0d84ced Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Fri, 17 Apr 2026 16:27:43 -0600 Subject: [PATCH 6/7] (cherrypick) fix: prevent PoA leader deadlock after reconciliation import (#3261) (#3274) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit cherry-pick https://github.com/FuelLabs/fuel-core/pull/3261 ## Summary - Fixes a deadlock in the PoA service that caused a 30-minute block production outage on testnet (April 9, 2026) - After a FENCING_ERROR, reconciliation imports a block via `execute_and_commit` which marks it as `Source::Network`. The SyncTask sees this and transitions from `Synced` → `NotSynced`. On the next iteration, `ensure_synced()` blocks forever — the leader can't produce while blocked, and the SyncTask needs a locally-produced block to recover. Classic deadlock. - Fix: add a reconciliation watermark (`Arc`) shared between `MainTask` and `SyncTask`. Before importing reconciliation blocks, `MainTask` sets the watermark to the max height. `SyncTask` treats blocks at heights ≤ the watermark as locally produced, staying `Synced`. ## Details **Root cause chain:** 1. `importer.rs:584-585` — `execute_and_commit` always uses `ImportResult::new_from_network()` 2. `sync.rs:186-203` — SyncTask transitions `Synced → NotSynced` on non-local block with height > current 3. `service.rs:501-521` — `ensure_synced()` blocks on `sync_state.changed()` when `NotSynced` 4. Deadlock: leader blocked in `ensure_synced()`, SyncTask waiting for locally-produced block that can never arrive **Why a watermark:** A bool flag has a race condition — the SyncTask may not poll the broadcast channel until after the flag is cleared. The watermark encodes a permanent fact ("all blocks up to height N were reconciled") that never needs clearing. **Files changed (all within `fuel-core-poa`):** - `sync.rs` — Add `reconciliation_watermark` field, check it in block handler - `service.rs` — Create shared watermark, set via `fetch_max` during reconciliation - `service_test.rs` — Add deadlock reproduction test ## Test plan - [x] `sync_task__network_block_at_reconciliation_height_causes_not_synced_without_watermark` — confirms bug mechanism (network block → NotSynced) - [x] `sync_task__network_block_within_watermark_stays_synced` — verifies watermark prevents NotSynced; blocks above watermark still trigger it - [x] `main_task__reconciliation_import_does_not_deadlock_leader` — full service-level deadlock reproduction (fails without fix, passes with) - [x] All 51 existing `fuel-core-poa` tests pass --------- Please go to the `Preview` tab and select the appropriate sub-template: * [Classic PR](?expand=1&template=default.md) * [Bump version](?expand=1&template=bump_version.md) --------- Co-authored-by: Brandon Kite Co-authored-by: Green Baneling Co-authored-by: Hannes Karppila <2204863+Dentosal@users.noreply.github.com> --- .changes/fixed/3274.md | 1 + .../consensus_module/poa/src/service.rs | 18 +++ .../consensus_module/poa/src/service_test.rs | 150 +++++++++++++++++- .../services/consensus_module/poa/src/sync.rs | 142 ++++++++++++++++- 4 files changed, 306 insertions(+), 5 deletions(-) create mode 100644 .changes/fixed/3274.md diff --git a/.changes/fixed/3274.md b/.changes/fixed/3274.md new file mode 100644 index 00000000000..4629dfdbbae --- /dev/null +++ b/.changes/fixed/3274.md @@ -0,0 +1 @@ +Fix PoA leader deadlock after reconciliation import where `ensure_synced()` blocked forever because `execute_and_commit` marked reconciliation blocks as `Source::Network`, causing the SyncTask to transition to `NotSynced`. diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 210408c1b12..ac929ffcd06 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -143,6 +143,9 @@ pub struct MainTask { /// externally controlled start of block production block_production_ready_signal: BlockProductionReadySignal, reconciliation_port: RP, + /// Shared with SyncTask — blocks at heights <= this watermark were + /// imported via reconciliation and should not trigger NotSynced. + reconciliation_watermark: Arc, } impl MainTask @@ -183,12 +186,15 @@ where .. } = config; + let reconciliation_watermark = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let sync_task = SyncTask::new( peer_connections_stream, min_connected_reserved_peers, time_until_synced, block_stream, last_block, + Arc::clone(&reconciliation_watermark), ); let sync_task_handle = ServiceRunner::new(sync_task); @@ -213,6 +219,7 @@ where production_timeout, block_production_ready_signal, reconciliation_port, + reconciliation_watermark, } } @@ -628,6 +635,17 @@ where continue; } + // Set watermark to this block's height so SyncTask + // doesn't transition to NotSynced when it sees the + // broadcast. execute_and_commit marks blocks as + // Source::Network, which would otherwise cause a + // Synced → NotSynced transition and deadlock + // ensure_synced(). + self.reconciliation_watermark.fetch_max( + u32::from(block_height), + std::sync::atomic::Ordering::Release, + ); + match self.block_importer.execute_and_commit(block).await { Ok(()) => { self.last_height = block_height; diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 23db1e2c49e..8b62bb32f03 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -47,9 +47,12 @@ use fuel_core_types::{ fuel_tx::*, fuel_types::BlockHeight, secrecy::Secret, - services::executor::{ - ExecutionResult, - UncommittedResult, + services::{ + block_importer::BlockImportInfo, + executor::{ + ExecutionResult, + UncommittedResult, + }, }, signer::SignMode, tai64::{ @@ -940,3 +943,144 @@ async fn consensus_service__run__will_produce_blocks_with_ready_signal() { let produced_block = block_receiver.recv().await.unwrap(); assert!(matches!(produced_block, FakeProducedBlock::New(_, _))); } + +/// Reproduces the deadlock from the April 9, 2026 testnet outage. +/// +/// After a FENCING_ERROR, reconciliation imports a block via +/// `execute_and_commit` which marks it as `Source::Network`. The SyncTask +/// sees this non-local block and transitions from Synced → NotSynced. +/// On the next `run()` iteration, `ensure_synced()` blocks forever +/// because the leader can't produce locally-sourced blocks while blocked. +/// +/// This test uses a `FakeReconciliationPort` that returns +/// `UnreconciledBlocks` on the first call (simulating reconciliation after +/// fencing error), then switches to `ReconciledLeader`. The +/// `MockBlockImporter::execute_and_commit` broadcasts a `Source::Network` +/// block into the block_stream, triggering the SyncTask's NotSynced +/// transition. Without the watermark fix, the service deadlocks and +/// never produces a block. +#[tokio::test] +async fn main_task__reconciliation_import_does_not_deadlock_leader() { + // given: a PoA service with Trigger::Interval + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(10), + }, + signer: SignMode::Key(test_signing_key()), + metrics: false, + min_connected_reserved_peers: 0, + time_until_synced: Duration::ZERO, + ..Default::default() + }; + + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + + // Use an mpsc channel to feed both execute_and_commit results and + // the SyncTask's block_stream. This simulates what the real importer + // does when `execute_and_commit` commits a block and broadcasts it. + let (block_import_sender, block_import_receiver) = + tokio::sync::mpsc::channel::(16); + + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + + // When execute_and_commit is called for the reconciliation block, + // send it as Source::Network — this is what the real importer + // does (ImportResult::new_from_network at importer.rs:585). + let sender_for_import = block_import_sender.clone(); + block_importer + .expect_execute_and_commit() + .returning(move |block| { + let header = block.entity.header().clone(); + let _ = sender_for_import.try_send(BlockImportInfo::new_from_network(header)); + Ok(()) + }); + + // The block_stream feeds the SyncTask — wrap the mpsc receiver. + // Use Option+Mutex to allow moving out of the FnMut closure. + let receiver_cell = Arc::new(StdMutex::new(Some(block_import_receiver))); + block_importer.expect_block_stream().returning(move || { + let rx = receiver_cell + .lock() + .unwrap() + .take() + .expect("block_stream called more than once"); + Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx)) + }); + + block_importer + .expect_latest_block_height() + .returning(|| Ok(Some(BlockHeight::from(0u32)))); + + let txpool = MockTransactionPool::no_tx_updates(); + let p2p_port = generate_p2p_port(); + let predefined_blocks = InMemoryPredefinedBlocks::new(HashMap::new()); + let time = TestTime::at_unix_epoch(); + let watch = time.watch(); + + // Create a reconciliation port that returns UnreconciledBlocks once, + // then switches to ReconciledLeader for subsequent calls. + let block = block_for_height(2); + let consensus = FakeBlockSigner { succeeds: true } + .seal_block(&block) + .await + .unwrap(); + let unreconciled = LeaderState::UnreconciledBlocks(vec![SealedBlock { + entity: block, + consensus, + }]); + + let reconciliation_port = FakeReconciliationPort::with_state(Ok(unreconciled)); + let reconciliation_state = reconciliation_port.state.clone(); + + let task = MainTask::new( + &BlockHeader::new_block(BlockHeight::from(1u32), watch.now()), + config, + txpool, + block_producer, + block_importer, + p2p_port, + FakeBlockSigner { succeeds: true }.into(), + predefined_blocks, + watch, + FakeBlockProductionReadySignal, + reconciliation_port, + ); + + // when: start the service + let service = ServiceRunner::new(task); + service.start_and_await().await.unwrap(); + + // Give time for the reconciliation block to be imported. + // After import, switch to ReconciledLeader so the service can + // attempt normal block production. + tokio::task::yield_now().await; + time::advance(Duration::from_millis(20)).await; + tokio::task::yield_now().await; + + // Switch reconciliation port to ReconciledLeader + { + let mut state = reconciliation_state.lock().unwrap(); + *state = Ok(LeaderState::ReconciledLeader); + } + + // then: try to receive a produced block within a timeout. + // Without the fix, ensure_synced() deadlocks and no block is produced. + let receive_timeout = tokio::spawn(async move { + time::timeout(Duration::from_millis(500), block_receiver.recv()).await + }); + time::advance(Duration::from_millis(501)).await; + tokio::task::yield_now().await; + let receive_result = receive_timeout.await.unwrap(); + + let _ = service.stop_and_await().await; + + // This assertion fails without the watermark fix — the service + // deadlocks in ensure_synced() and never produces a block. + assert!( + receive_result.is_ok(), + "Expected block production after reconciliation, but the service \ + deadlocked in ensure_synced() — this is the bug from the \ + April 9, 2026 testnet outage" + ); +} diff --git a/crates/services/consensus_module/poa/src/sync.rs b/crates/services/consensus_module/poa/src/sync.rs index bd66f794628..5c3364038d5 100644 --- a/crates/services/consensus_module/poa/src/sync.rs +++ b/crates/services/consensus_module/poa/src/sync.rs @@ -1,5 +1,11 @@ use std::{ - sync::Arc, + sync::{ + Arc, + atomic::{ + AtomicU32, + Ordering, + }, + }, time::Duration, }; @@ -52,6 +58,10 @@ pub struct SyncTask { state_receiver: watch::Receiver, inner_state: InnerSyncState, timer: Option, + /// Blocks at heights <= this watermark were imported via reconciliation + /// by the leader and should not trigger Synced → NotSynced transitions. + /// Set by MainTask via `fetch_max`, monotonically increasing, never cleared. + reconciliation_watermark: Arc, } impl SyncTask { @@ -61,6 +71,7 @@ impl SyncTask { time_until_synced: Duration, block_stream: BoxStream, block_header: &BlockHeader, + reconciliation_watermark: Arc, ) -> Self { let inner_state = InnerSyncState::from_config( min_connected_reserved_peers, @@ -92,6 +103,7 @@ impl SyncTask { state_receiver, inner_state, timer, + reconciliation_watermark, } } @@ -184,7 +196,11 @@ impl RunnableTask for SyncTask { self.restart_timer(); } InnerSyncState::Synced { block_header, has_sufficient_peers } if new_block_height > block_header.height() => { - if block_info.is_locally_produced() { + let watermark = self.reconciliation_watermark.load(Ordering::Acquire); + let is_reconciliation = watermark > 0 + && u32::from(*new_block_height) <= watermark; + + if block_info.is_locally_produced() || is_reconciliation { self.inner_state = InnerSyncState::Synced { block_header: block_info.block_header.clone(), has_sufficient_peers: *has_sufficient_peers @@ -278,6 +294,7 @@ impl InnerSyncState { } #[allow(clippy::arithmetic_side_effects)] +#[allow(non_snake_case)] #[cfg(test)] mod tests { use super::*; @@ -359,6 +376,7 @@ mod tests { time_until_synced, block_stream, &Default::default(), + Arc::new(AtomicU32::new(0)), ); (sync_task, watcher, tx) @@ -598,4 +616,124 @@ mod tests { )); matches!(*sync_task.state_receiver.borrow(), SyncState::Synced(_)); } + + /// Reproduces the deadlock root cause: a network-sourced block (from + /// reconciliation via `execute_and_commit`) arrives while the SyncTask + /// is in Synced state. Without the watermark fix, this transitions + /// the SyncTask to NotSynced, which deadlocks the leader's + /// `ensure_synced()` call. + #[tokio::test] + async fn sync_task__network_block_at_reconciliation_height_causes_not_synced_without_watermark() + { + // given: a SyncTask that starts in Synced state (min_peers=0, time=ZERO) + let connections_stream = MockStream::::new(vec![]).into_boxed(); + let block_stream = MockStream::::new(vec![]).into_boxed(); + + let (tx, shutdown) = + tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher: StateWatcher = shutdown.into(); + + // Watermark is 0 (not set) — simulates the pre-fix state + let watermark = Arc::new(AtomicU32::new(0)); + + let mut sync_task = SyncTask::new( + connections_stream, + 0, // min_connected_reserved_peers + Duration::ZERO, + block_stream, + &BlockHeader::new_block(5u32.into(), Tai64::now()), + watermark, + ); + + // Verify we start in Synced state + assert!( + matches!(*sync_task.state_receiver.borrow(), SyncState::Synced(_)), + "SyncTask should start Synced with min_peers=0 and time_until_synced=ZERO" + ); + + // when: a Source::Network block arrives at height 6 (> current height 5) + // This is what happens when reconciliation imports a block via + // execute_and_commit, which always uses ImportResult::new_from_network + let network_block_stream = + MockStream::new(vec![BlockHeader::new_block(6u32.into(), Tai64::now())]) + .map(BlockImportInfo::new_from_network) + .into_boxed(); + sync_task.block_stream = network_block_stream; + + let _ = sync_task.run(&mut watcher).await; + + // then: SyncTask transitions to NotSynced — THIS IS THE BUG + // The leader's ensure_synced() will now block forever because + // it can't produce locally-produced blocks while blocked. + assert_eq!( + SyncState::NotSynced, + *sync_task.state_receiver.borrow(), + "Without watermark fix, a network-sourced reconciliation block \ + causes NotSynced — this deadlocks the leader" + ); + + drop(tx); + } + + /// Verifies the watermark fix: when the reconciliation watermark covers + /// the block height, a network-sourced block should NOT trigger NotSynced. + #[tokio::test] + async fn sync_task__network_block_within_watermark_stays_synced() { + // given: a SyncTask in Synced state with watermark set to height 6 + let connections_stream = MockStream::::new(vec![]).into_boxed(); + let block_stream = MockStream::::new(vec![]).into_boxed(); + + let (tx, shutdown) = + tokio::sync::watch::channel(fuel_core_services::State::Started); + let mut watcher: StateWatcher = shutdown.into(); + + let watermark = Arc::new(AtomicU32::new(6)); + + let mut sync_task = SyncTask::new( + connections_stream, + 0, + Duration::ZERO, + block_stream, + &BlockHeader::new_block(5u32.into(), Tai64::now()), + watermark, + ); + + assert!(matches!( + *sync_task.state_receiver.borrow(), + SyncState::Synced(_) + )); + + // when: a Source::Network block at height 6 (within watermark) + let network_block_stream = + MockStream::new(vec![BlockHeader::new_block(6u32.into(), Tai64::now())]) + .map(BlockImportInfo::new_from_network) + .into_boxed(); + sync_task.block_stream = network_block_stream; + + let _ = sync_task.run(&mut watcher).await; + + // then: should stay Synced because watermark covers height 6 + assert!( + matches!(*sync_task.state_receiver.borrow(), SyncState::Synced(_)), + "With watermark=6, a network block at height 6 should NOT trigger NotSynced" + ); + + // when: a Source::Network block at height 7 (ABOVE watermark) + let network_block_stream = + MockStream::new(vec![BlockHeader::new_block(7u32.into(), Tai64::now())]) + .map(BlockImportInfo::new_from_network) + .into_boxed(); + sync_task.block_stream = network_block_stream; + + let _ = sync_task.run(&mut watcher).await; + + // then: should transition to NotSynced (watermark doesn't protect above its value) + assert_eq!( + SyncState::NotSynced, + *sync_task.state_receiver.borrow(), + "A network block above the watermark should still trigger NotSynced" + ); + + drop(tx); + } } From 6241f5c3b97639ea81f7ea0f591633fb367195ae Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Fri, 17 Apr 2026 16:27:56 -0600 Subject: [PATCH 7/7] (cherrypick) Improve redis publish performance (#3272) (#3273) ## Linked Issues/PRs Cherrypick https://github.com/FuelLabs/fuel-core/pull/3272 ## Description ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? Please go to the `Preview` tab and select the appropriate sub-template: * [Classic PR](?expand=1&template=default.md) * [Bump version](?expand=1&template=bump_version.md) --- .changes/fixed/3273.md | 1 + .../write_block.lua | 20 ++++++-- .../service/adapters/consensus_module/poa.rs | 49 ++++++++++++++----- 3 files changed, 54 insertions(+), 16 deletions(-) create mode 100644 .changes/fixed/3273.md diff --git a/.changes/fixed/3273.md b/.changes/fixed/3273.md new file mode 100644 index 00000000000..89dd2032571 --- /dev/null +++ b/.changes/fixed/3273.md @@ -0,0 +1 @@ +Improve performance of redis block publish by making more parallel and optimizing the lua code \ No newline at end of file diff --git a/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua b/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua index fe70fc8f6c3..4b4febdb0aa 100644 --- a/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua +++ b/crates/fuel-core/redis_leader_lease_adapter_scripts/write_block.lua @@ -39,16 +39,28 @@ end -- the new leader to fail on the orphan's node, but it can still reach -- quorum on the remaining nodes. If the orphan blocks quorum entirely, -- the leader must reconcile via the read path instead. +local posted_height = tonumber(ARGV[3]) local existing = redis.call("XREVRANGE", KEYS[1], "+", "-") +local stop_scan = false for _, entry in ipairs(existing) do local fields = entry[2] for i = 1, #fields, 2 do - if fields[i] == "height" and fields[i + 1] == ARGV[3] then - return redis.error_reply( - "HEIGHT_EXISTS: Block at height " .. ARGV[3] .. " already in stream" - ) + if fields[i] == "height" then + local entry_height = tonumber(fields[i + 1]) + if entry_height == posted_height then + return redis.error_reply( + "HEIGHT_EXISTS: Block at height " .. ARGV[3] .. " already in stream" + ) + end + if entry_height ~= nil and entry_height < posted_height then + stop_scan = true + break + end end end + if stop_scan then + break + end end -- 5) Persist block entry. diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 364263df3e5..9589599639d 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -991,6 +991,33 @@ impl RedisLeaderLeaseAdapter { } } + fn publish_block_on_all_nodes( + &self, + epoch: u64, + block: &SealedBlock, + block_data: &[u8], + ) -> Vec> { + std::thread::scope(|scope| { + let handles = self + .redis_nodes + .iter() + .map(|redis_node| { + scope.spawn(move || { + self.publish_block_on_node(redis_node, epoch, block, block_data) + }) + }) + .collect::>(); + + handles + .into_iter() + .map(|handle| match handle.join() { + Ok(result) => result, + Err(_) => Err(anyhow!("Redis publish worker panicked")), + }) + .collect() + }) + } + /// Repropose a sub-quorum block to all Redis nodes to reach quorum. /// Called during reconciliation when a block exists on some nodes but /// below quorum — possibly from a leader that published and committed @@ -1029,8 +1056,8 @@ impl RedisLeaderLeaseAdapter { // block at this height, but it might be a different block from // a competing leader's partial write. let mut total_with_block = pre_existing_count; - for redis_node in &self.redis_nodes { - match self.publish_block_on_node(redis_node, epoch, block, &block_data) { + for result in self.publish_block_on_all_nodes(epoch, block, &block_data) { + match result { Ok(WriteBlockResult::Written) => { total_with_block = total_with_block.saturating_add(1); } @@ -1220,16 +1247,14 @@ impl BlockReconciliationWritePort for RedisLeaderLeaseAdapter { }; let block_data = postcard::to_allocvec(block)?; let successes = self - .redis_nodes - .iter() - .map(|redis_node| { - match self.publish_block_on_node(redis_node, epoch, block, &block_data) { - Ok(WriteBlockResult::Written) => true, - Ok(_) => false, - Err(err) => { - tracing::debug!("Redis publish on node failed: {err}"); - false - } + .publish_block_on_all_nodes(epoch, block, &block_data) + .into_iter() + .map(|result| match result { + Ok(WriteBlockResult::Written) => true, + Ok(_) => false, + Err(err) => { + tracing::debug!("Redis publish on node failed: {err}"); + false } }) .filter(|success| *success)