Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fixed/3264.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rollback stale preconfirmations in the mempool when the canonical block at that height omits the preconfirmed transactions, restoring spent inputs and removing dependent transactions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/services/txpool_v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tracing = { workspace = true }
[dev-dependencies]
fuel-core-storage = { workspace = true, features = ["std", "test-helpers"] }
fuel-core-trace = { path = "../../trace" }
fuel-core-txpool = { path = ".", features = ["test-helpers"] }
mockall = { workspace = true }
proptest = { workspace = true }
rand = { workspace = true }
Expand Down
8 changes: 7 additions & 1 deletion crates/services/txpool_v2/src/extracted_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ impl ExtractedOutputs {
for (utxo_id, output) in outputs {
match output {
Output::ContractCreated { contract_id, .. } => {
self.contract_created.insert(*contract_id, *utxo_id.tx_id());
let tx_id = *utxo_id.tx_id();
self.contract_created.insert(*contract_id, tx_id);
// Track the reverse mapping so cleanup via new_executed_transaction works.
self.contract_created_by_tx
.entry(tx_id)
.or_default()
.push(*contract_id);
}
Output::Coin {
to,
Expand Down
3 changes: 3 additions & 0 deletions crates/services/txpool_v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ mod spent_inputs;
mod tests;
#[cfg(test)]
fuel_core_trace::enable_tracing!();
// Needed to activate the `test-helpers` feature flag for integration tests.
#[cfg(test)]
use fuel_core_txpool as _;

use fuel_core_types::fuel_asm::Word;
pub use pool::TxPoolStats;
Expand Down
83 changes: 83 additions & 0 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,50 @@ where
self.tx_id_to_storage_id.keys()
}

/// Process a preconfirmed transaction as committed while recording its spent
/// inputs so they can be rolled back later if the transaction is not
/// included in the canonical block.
pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) {
self.spent_inputs.spend_inputs_by_tx_id(tx_id);
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
self.storage.get_direct_dependents(storage_id).collect();
let Some(transaction) = self.storage.remove_transaction(storage_id) else {
debug_assert!(false, "Storage data not found for the transaction");
tracing::warn!(
"Storage data not found for the transaction during \
`process_preconfirmed_committed_transaction`."
);
return;
};
self.extracted_outputs
.new_extracted_transaction(&transaction.transaction);
// Save the inputs before spending them permanently, so we can roll
// them back if the canonical block omits this transaction.
self.spent_inputs
.record_tentative_spend(tx_id, transaction.transaction.inputs());
self.spent_inputs
.spend_inputs(tx_id, transaction.transaction.inputs());
self.update_components_and_caches_on_removal(iter::once(&transaction));

let mut new_executable_transaction = false;
for dependent in dependents {
if !self.storage.has_dependencies(&dependent)
&& let Some(storage_data) = self.storage.get(&dependent)
{
self.selection_algorithm
.new_executable_transaction(dependent, storage_data);
new_executable_transaction = true;
}
}
if new_executable_transaction {
self.new_executable_txs_notifier.send_replace(());
}
}

self.update_stats();
}

/// Process committed transactions:
/// - Remove transaction but keep its dependents and the dependents become executables.
/// - Notify about possible new executable transactions.
Expand Down Expand Up @@ -685,6 +729,45 @@ where
self.update_stats();
}

/// Rollback a preconfirmed transaction that was not included in the canonical block.
///
/// This clears the preconfirmation-derived outputs and removes any pool transactions
/// that depend on those outputs, since those inputs no longer exist on-chain.
pub fn rollback_preconfirmed_transaction(&mut self, tx_id: TxId) {
// Remove preconfirmed outputs so dependents can't use them.
self.extracted_outputs.new_skipped_transaction(&tx_id);
// Allow the transaction itself to be re-submitted.
self.spent_inputs.unspend_preconfirmed(tx_id);
Comment thread
cursor[bot] marked this conversation as resolved.

// Remove any pool transactions that used the preconfirmed outputs as inputs.
let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id);
if !coin_dependents.is_empty() {
let reason = format!(
"Preconfirmed parent transaction {tx_id} was not included in the canonical block"
);
for dependent in coin_dependents {
let removed = self
.storage
.remove_transaction_and_dependents_subtree(dependent);
self.update_components_and_caches_on_removal(removed.iter());
let removed_txs: Vec<_> = removed
.into_iter()
.map(|data| {
let dependent_tx_id = data.transaction.id();
let tx_status =
statuses::SqueezedOut::new(reason.clone(), dependent_tx_id);
(dependent_tx_id, tx_status)
})
.collect();
if !removed_txs.is_empty() {
self.tx_status_manager.squeezed_out_txs(removed_txs);
}
}
}

self.update_stats();
}

fn check_blob_does_not_exist(
tx: &PoolTransaction,
persistent_storage: &impl TxPoolPersistentStorage,
Expand Down
85 changes: 76 additions & 9 deletions crates/services/txpool_v2/src/pool_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use fuel_core_types::{
},
};
use std::{
iter,
collections::{
BTreeMap,
HashSet,
},
ops::Deref,
sync::Arc,
time::SystemTime,
Expand Down Expand Up @@ -134,6 +137,7 @@ impl PoolWorkerInterface {
pending_pool: PendingPool::new(tx_pool.config.pending_pool_tx_ttl),
pool: tx_pool,
view_provider,
tentative_preconfs: BTreeMap::new(),
};

tokio_runtime.block_on(async {
Expand Down Expand Up @@ -268,6 +272,10 @@ pub(super) struct PoolWorker<View, TxStatusManager> {
pending_pool: PendingPool,
view_provider: Arc<dyn AtomicView<LatestView = View>>,
notification_sender: Sender<PoolNotification>,
/// Tracks preconfirmed transaction IDs by their tentative block height.
/// Used to roll back stale preconfirmations when the canonical block at
/// that height does not include those transactions.
tentative_preconfs: BTreeMap<BlockHeight, HashSet<TxId>>,
}

impl<View, TxStatusManager> PoolWorker<View, TxStatusManager>
Expand Down Expand Up @@ -485,16 +493,54 @@ where
}

fn process_block(&mut self, block_result: SharedImportResult) {
self.pool.process_committed_transactions(
block_result.tx_status.iter().map(|tx_status| tx_status.id),
);
let block_height = *block_result.sealed_block.entity.header().height();

let confirmed_tx_ids: HashSet<TxId> = block_result
.tx_status
.iter()
.map(|tx_status| tx_status.id)
.collect();

self.pool
.process_committed_transactions(confirmed_tx_ids.iter().copied());

block_result.tx_status.iter().for_each(|tx_status| {
self.pool
.extracted_outputs
.new_executed_transaction(&tx_status.id);
});

// Reconcile tentative preconfirmations for all heights up to and including
// the imported block height. Any preconfirmed tx that is absent from the
// canonical block must have its state rolled back so those inputs/outputs
// do not remain stale in the mempool.
let stale_heights: Vec<BlockHeight> = self
.tentative_preconfs
.range(..=block_height)
.map(|(h, _)| *h)
.collect();

for height in stale_heights {
if let Some(tentative_txs) = self.tentative_preconfs.remove(&height) {
for tx_id in tentative_txs {
if confirmed_tx_ids.contains(&tx_id) {
// Tx was included in the canonical block — confirm the
// tentative spend record so it won't be rolled back.
self.pool.spent_inputs.confirm_tentative_spend(&tx_id);
} else {
tracing::debug!(
"Rolling back stale preconfirmation for tx {} \
(tentative block {}, canonical block {})",
tx_id,
height,
block_height,
);
self.pool.rollback_preconfirmed_transaction(tx_id);
}
}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

let resolved_txs = self.pending_pool.new_known_txs(
block_result
.sealed_block
Expand Down Expand Up @@ -524,20 +570,31 @@ where
tx_id: TxId,
status: PreConfirmationStatus,
) {
let outputs = match &status {
let (outputs, block_height) = match &status {
PreConfirmationStatus::Success(status) => {
self.pool.process_committed_transactions(iter::once(tx_id));
self.pool.process_preconfirmed_committed_transaction(tx_id);
let height = status.tx_pointer.block_height();
if let Some(outputs) = &status.resolved_outputs {
outputs
(outputs, Some(height))
} else {
// Still track the height so block import can clean up spent_inputs.
self.tentative_preconfs
.entry(height)
.or_default()
.insert(tx_id);
return;
}
}
PreConfirmationStatus::Failure(status) => {
self.pool.process_committed_transactions(iter::once(tx_id));
self.pool.process_preconfirmed_committed_transaction(tx_id);
let height = status.tx_pointer.block_height();
if let Some(outputs) = &status.resolved_outputs {
outputs
(outputs, Some(height))
} else {
self.tentative_preconfs
.entry(height)
.or_default()
.insert(tx_id);
return;
}
}
Expand All @@ -546,6 +603,16 @@ where
return;
}
};

// Track the tentative block height so the preconfirmed outputs can be
// rolled back if the canonical block at that height omits this tx.
if let Some(height) = block_height {
self.tentative_preconfs
.entry(height)
.or_default()
.insert(tx_id);
}

// All of this can be useful in case that we didn't know about the transaction
let resolved = self
.pending_pool
Expand Down
41 changes: 41 additions & 0 deletions crates/services/txpool_v2/src/spent_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ pub struct SpentInputs {
/// transaction spent it. Later, this information can be used to unspent
/// or fully spend the input.
spender_of_inputs: HashMap<TxId, Vec<InputKey>>,
/// Inputs permanently spent during preconfirmation processing, saved so
/// they can be rolled back if the preconfirmation turns out to be stale.
tentative_spent: HashMap<TxId, Vec<InputKey>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any chance that his will grow indefinitely?

Seems fine to me. It's not good to test internal values anyway, but worth considering if this is a "leak" in any way.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, unless blocks are not getting imported at all. These are always cleared for when the associated block height gets imported.

}

impl SpentInputs {
pub fn new(capacity: NonZeroUsize) -> Self {
Self {
spent_inputs: LruCache::new(capacity),
spender_of_inputs: HashMap::new(),
tentative_spent: HashMap::new(),
}
}

Expand Down Expand Up @@ -114,6 +118,43 @@ impl SpentInputs {
pub fn is_spent_tx(&self, tx: &TxId) -> bool {
self.spent_inputs.contains(&InputKey::Tx(*tx))
}

/// Record inputs that were permanently spent during preconfirmation processing.
/// The saved keys can later be rolled back via [`unspend_preconfirmed`].
pub fn record_tentative_spend(&mut self, tx_id: TxId, inputs: &[Input]) {
let keys: Vec<InputKey> = inputs
.iter()
.filter_map(|input| {
if input.is_coin() {
input.utxo_id().cloned().map(InputKey::Utxo)
} else if input.is_message() {
input.nonce().cloned().map(InputKey::Message)
} else {
None
}
})
.collect();
self.tentative_spent.insert(tx_id, keys);
}

/// Remove the tentative-spend record for a confirmed transaction, preventing
/// a spurious rollback. Called when the preconfirmed tx is included in the
/// canonical block.
pub fn confirm_tentative_spend(&mut self, tx_id: &TxId) {
self.tentative_spent.remove(tx_id);
}

/// Removes the tx entry and any individually-tracked UTXO/message inputs
/// from spent inputs, allowing the same inputs to be re-used.
/// Used when rolling back a stale preconfirmation.
pub fn unspend_preconfirmed(&mut self, tx_id: TxId) {
self.spent_inputs.pop(&InputKey::Tx(tx_id));
if let Some(saved_keys) = self.tentative_spent.remove(&tx_id) {
for key in saved_keys {
self.spent_inputs.pop(&key);
}
}
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions crates/services/txpool_v2/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ impl MockTxStatusManager {
tx,
}
}

/// Send a preconfirmation update to the pool worker.
pub fn send_preconfirmation(&self, tx_id: TxId, status: PreConfirmationStatus) {
let _ = self.tx_preconfirmations_update_sender.send((tx_id, status));
}
}

impl ports::TxStatusManager for MockTxStatusManager {
Expand Down
1 change: 1 addition & 0 deletions crates/services/txpool_v2/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod stability_test;
mod tests_p2p;
mod tests_pending_pool;
mod tests_pool;
mod tests_preconf_rollback;
mod tests_service;
mod tx_status_manager_integration;
mod universe;
Loading
Loading