Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fixed/3264.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/services/txpool_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tracing = { workspace = true }
[dev-dependencies]
fuel-core-storage = { workspace = true, features = ["std", "test-helpers"] }
fuel-core-trace = { path = "../../trace" }
fuel-core-txpool = { path = ".", features = ["test-helpers"] }
mockall = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }
Expand Down
34 changes: 32 additions & 2 deletions crates/services/txpool_v2/src/collision_manager/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use fuel_core_types::{
CoinPredicate,
CoinSigned,
},
contract::Contract as ContractInput,
message::{
MessageCoinPredicate,
MessageCoinSigned,
Expand Down Expand Up @@ -57,6 +58,9 @@ pub struct BasicCollisionManager<StorageIndex> {
coins_spenders: BTreeMap<UtxoId, StorageIndex>,
/// Contract -> Transaction that currently create the contract
contracts_creators: HashMap<ContractId, StorageIndex>,
/// Contract -> Transactions (by TxId) that currently use the contract as an input.
/// Symmetric to `contracts_creators`; used to evict dependents during rollback.
contract_users: HashMap<ContractId, Vec<TxId>>,
/// Blob -> Transaction that currently create the blob
blobs_users: HashMap<BlobId, StorageIndex>,
}
Expand All @@ -67,6 +71,7 @@ impl<StorageIndex> BasicCollisionManager<StorageIndex> {
messages_spenders: HashMap::new(),
coins_spenders: BTreeMap::new(),
contracts_creators: HashMap::new(),
contract_users: HashMap::new(),
Comment thread
cursor[bot] marked this conversation as resolved.
blobs_users: HashMap::new(),
}
}
Expand Down Expand Up @@ -174,6 +179,17 @@ where
.collect()
}

fn get_contract_users(&self, contract_id: &ContractId) -> Vec<TxId> {
self.contract_users
.get(contract_id)
.cloned()
.unwrap_or_default()
}

fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool {
self.contracts_creators.contains_key(contract_id)
}

fn find_collisions(
&self,
transaction: &PoolTransaction,
Expand Down Expand Up @@ -248,6 +264,7 @@ where
let blob_id = checked_tx.transaction().blob_id();
self.blobs_users.insert(*blob_id, storage_id);
}
let tx_id = store_entry.transaction.id();
for input in store_entry.transaction.inputs() {
match input {
Input::CoinSigned(CoinSigned { utxo_id, .. })
Expand All @@ -262,7 +279,12 @@ where
// insert message
self.messages_spenders.insert(*nonce, storage_id);
}
_ => {}
Input::Contract(ContractInput { contract_id, .. }) => {
self.contract_users
.entry(*contract_id)
.or_default()
.push(tx_id);
}
}
}
for output in store_entry.transaction.outputs().iter() {
Expand All @@ -284,6 +306,7 @@ where
let blob_id = checked_tx.transaction().blob_id();
self.blobs_users.remove(blob_id);
}
let tx_id = transaction.id();
for input in transaction.inputs() {
match input {
Input::CoinSigned(CoinSigned { utxo_id, .. })
Expand All @@ -298,7 +321,14 @@ where
// remove message
self.messages_spenders.remove(nonce);
}
_ => {}
Input::Contract(ContractInput { contract_id, .. }) => {
if let Some(users) = self.contract_users.get_mut(contract_id) {
users.retain(|id| id != &tx_id);
if users.is_empty() {
self.contract_users.remove(contract_id);
}
}
}
Comment on lines +351 to +358
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removing transaction IDs here, you will not have access to them during rollback_preconfirmed_transaction. Because we remove it when a transaction is included in the block during production, while you call rollback_preconfirmed_transaction after block is done, so you will not have access to them. It only should affect authority, I guess, so maybe it is fine. I think sentries should still be able to work with it. But maybe we want to move this clean up int oprocess_block

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up: #3268

}
}
for output in transaction.outputs().iter() {
Expand Down
12 changes: 12 additions & 0 deletions crates/services/txpool_v2/src/collision_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use fuel_core_types::{
};
use std::collections::HashMap;

use fuel_core_types::fuel_tx::ContractId;

use crate::storage::StorageData;

pub mod basic;
Expand All @@ -27,6 +29,16 @@ pub trait CollisionManager {
/// Get spenders of coins UTXO created by a transaction ID.
fn get_coins_spenders(&self, tx_creator_id: &TxId) -> Vec<Self::StorageIndex>;

/// Get the IDs of pool transactions that have `Input::Contract(contract_id)`.
/// Used during preconfirmation rollback to evict pool txs that were admitted
/// only because a preconfirmed tx temporarily created the contract.
fn get_contract_users(&self, contract_id: &ContractId) -> Vec<TxId>;

/// Returns true if a currently in-pool transaction creates `contract_id`.
/// Used during rollback to skip eviction when the contract is still available
/// via a pool tx (independent of the rolled-back preconfirmation).
fn contract_created_in_pool(&self, contract_id: &ContractId) -> bool;

/// Inform the collision manager that a transaction was stored.
fn on_stored_transaction(
&mut self,
Expand Down
18 changes: 17 additions & 1 deletion crates/services/txpool_v2/src/extracted_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ impl ExtractedOutputs {
for (utxo_id, output) in outputs {
match output {
Output::ContractCreated { contract_id, .. } => {
self.contract_created.insert(*contract_id, *utxo_id.tx_id());
let tx_id = *utxo_id.tx_id();
self.contract_created.insert(*contract_id, tx_id);
// Track the reverse mapping so cleanup via new_executed_transaction works.
self.contract_created_by_tx
.entry(tx_id)
.or_default()
.push(*contract_id);
}
Output::Coin {
to,
Expand Down Expand Up @@ -131,6 +137,16 @@ impl ExtractedOutputs {
self.new_executed_transaction(tx_id);
}

/// Returns the contract IDs created by `tx_id`, if any.
/// Call this **before** [`new_skipped_transaction`] / [`new_executed_transaction`]
/// if the caller needs the list for cleanup.
pub fn contracts_created_by(&self, tx_id: &TxId) -> &[ContractId] {
self.contract_created_by_tx
.get(tx_id)
.map(Vec::as_slice)
.unwrap_or(&[])
}

pub fn new_executed_transaction(&mut self, tx_id: &TxId) {
let contract_ids = self.contract_created_by_tx.remove(tx_id);
if let Some(contract_ids) = contract_ids {
Expand Down
3 changes: 3 additions & 0 deletions crates/services/txpool_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ mod spent_inputs;
mod tests;
#[cfg(test)]
fuel_core_trace::enable_tracing!();
// Needed to activate the `test-helpers` feature flag for integration tests.
#[cfg(test)]
use fuel_core_txpool as _;

use fuel_core_types::fuel_asm::Word;
pub use pool::TxPoolStats;
Expand Down
132 changes: 132 additions & 0 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,57 @@ where
self.tx_id_to_storage_id.keys()
}

/// Process a preconfirmed transaction as committed while recording its spent
/// inputs so they can be rolled back later if the transaction is not
/// included in the canonical block.
pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) {
// If the tx was already extracted for local block production it is no
// longer in `tx_id_to_storage_id`, so the branch below that calls
// `record_tentative_spend` will be skipped. Preserve the input keys
// now, before `spend_inputs_by_tx_id` drains `spender_of_inputs[T]`.
if !self.tx_id_to_storage_id.contains_key(&tx_id) {
self.spent_inputs.move_spender_to_tentative(tx_id);
}
self.spent_inputs.spend_inputs_by_tx_id(tx_id);
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
self.storage.get_direct_dependents(storage_id).collect();
let Some(transaction) = self.storage.remove_transaction(storage_id) else {
debug_assert!(false, "Storage data not found for the transaction");
tracing::warn!(
"Storage data not found for the transaction during \
`process_preconfirmed_committed_transaction`."
);
return;
};
self.extracted_outputs
.new_extracted_transaction(&transaction.transaction);
// Save the inputs before spending them permanently, so we can roll
// them back if the canonical block omits this transaction.
self.spent_inputs
.record_tentative_spend(tx_id, transaction.transaction.inputs());
self.spent_inputs
.spend_inputs(tx_id, transaction.transaction.inputs());
self.update_components_and_caches_on_removal(iter::once(&transaction));

let mut new_executable_transaction = false;
for dependent in dependents {
if !self.storage.has_dependencies(&dependent)
&& let Some(storage_data) = self.storage.get(&dependent)
{
self.selection_algorithm
.new_executable_transaction(dependent, storage_data);
new_executable_transaction = true;
}
}
if new_executable_transaction {
self.new_executable_txs_notifier.send_replace(());
}
}

self.update_stats();
}

/// Process committed transactions:
/// - Remove transaction but keep its dependents and the dependents become executables.
/// - Notify about possible new executable transactions.
Expand Down Expand Up @@ -685,6 +736,87 @@ where
self.update_stats();
}

/// Rollback a preconfirmed transaction that was not included in the canonical block.
///
/// This clears the preconfirmation-derived outputs and removes any pool transactions
/// that depend on those outputs, since those inputs no longer exist on-chain.
pub fn rollback_preconfirmed_transaction(&mut self, tx_id: TxId) {
// Capture contracts created by this preconfirmation BEFORE clearing
// extracted_outputs, so we can evict any pool txs that were admitted
// only because of the now-stale temporary contract existence.
let created_contracts: Vec<_> =
self.extracted_outputs.contracts_created_by(&tx_id).to_vec();

// Remove preconfirmed outputs so dependents can't use them.
self.extracted_outputs.new_skipped_transaction(&tx_id);
// Allow the transaction itself to be re-submitted.
self.spent_inputs.unspend_preconfirmed(tx_id);
Comment thread
cursor[bot] marked this conversation as resolved.

let reason = format!(
"Preconfirmed parent transaction {tx_id} was not included in the canonical block"
);

// Remove any pool transactions that used the preconfirmed coin outputs as inputs.
let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id);
if !coin_dependents.is_empty() {
for dependent in coin_dependents {
let removed = self
.storage
.remove_transaction_and_dependents_subtree(dependent);
self.update_components_and_caches_on_removal(removed.iter());
let removed_txs: Vec<_> = removed
.into_iter()
.map(|data| {
let dependent_tx_id = data.transaction.id();
let tx_status =
statuses::SqueezedOut::new(reason.clone(), dependent_tx_id);
(dependent_tx_id, tx_status)
})
.collect();
if !removed_txs.is_empty() {
self.tx_status_manager.squeezed_out_txs(removed_txs);
}
}
}

// Remove any pool transactions that used a preconfirmed contract creation
// as an input and were admitted only via `extracted_outputs` (no in-pool
// graph dependency). Skip eviction when the contract is still available
// through an independent in-pool creator.
for contract_id in created_contracts {
if self
.collision_manager
.contract_created_in_pool(&contract_id)
{
continue;
}
let user_tx_ids = self.collision_manager.get_contract_users(&contract_id);
for user_tx_id in user_tx_ids {
let Some(&storage_id) = self.tx_id_to_storage_id.get(&user_tx_id) else {
continue;
};
let removed = self
.storage
.remove_transaction_and_dependents_subtree(storage_id);
self.update_components_and_caches_on_removal(removed.iter());
let removed_txs: Vec<_> = removed
.into_iter()
.map(|data| {
let dependent_tx_id = data.transaction.id();
let tx_status =
statuses::SqueezedOut::new(reason.clone(), dependent_tx_id);
(dependent_tx_id, tx_status)
})
.collect();
if !removed_txs.is_empty() {
self.tx_status_manager.squeezed_out_txs(removed_txs);
}
}
}

self.update_stats();
}

fn check_blob_does_not_exist(
tx: &PoolTransaction,
persistent_storage: &impl TxPoolPersistentStorage,
Expand Down
Loading
Loading