Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/fixed/3268.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Mempool preconfs: split collision manager removal into committed vs evicted paths to defer contract_users cleanup until after rollback
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 23 additions & 12 deletions crates/services/txpool_v2/src/collision_manager/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,33 +329,31 @@ where
}

fn on_removed_transaction(&mut self, transaction: &PoolTransaction) {
self.on_committed_transaction(transaction);
self.remove_from_contract_users(transaction);
}

fn on_committed_transaction(&mut self, transaction: &PoolTransaction) {
if let PoolTransaction::Blob(checked_tx, _) = transaction {
let blob_id = checked_tx.transaction().blob_id();
self.blobs_users.remove(blob_id);
}
let tx_id = transaction.id();
for input in transaction.inputs() {
match input {
Input::CoinSigned(CoinSigned { utxo_id, .. })
| Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => {
// remove coin
self.coins_spenders.remove(utxo_id);
}
Input::MessageCoinSigned(MessageCoinSigned { nonce, .. })
| Input::MessageCoinPredicate(MessageCoinPredicate { nonce, .. })
| Input::MessageDataSigned(MessageDataSigned { nonce, .. })
| Input::MessageDataPredicate(MessageDataPredicate { nonce, .. }) => {
// remove message
self.messages_spenders.remove(nonce);
}
Input::Contract(ContractInput { contract_id, .. }) => {
if let Some(users) = self.contract_users.get_mut(contract_id) {
users.retain(|id| id != &tx_id);
if users.is_empty() {
self.contract_users.remove(contract_id);
}
}
}
// Deliberately skipped: contract_users cleanup is deferred to
// remove_from_contract_users, which the caller invokes after
// all preconfirmation rollback for the block is complete.
Input::Contract(_) => {}
}
}
for output in transaction.outputs().iter() {
Expand All @@ -365,10 +363,23 @@ where
| Output::Variable { .. }
| Output::Contract(_) => {}
Output::ContractCreated { contract_id, .. } => {
// remove contract
self.contracts_creators.remove(contract_id);
}
};
}
}

fn remove_from_contract_users(&mut self, transaction: &PoolTransaction) {
let tx_id = transaction.id();
for input in transaction.inputs() {
if let Input::Contract(ContractInput { contract_id, .. }) = input
&& let Some(users) = self.contract_users.get_mut(contract_id)
{
users.retain(|id| id != &tx_id);
if users.is_empty() {
self.contract_users.remove(contract_id);
}
}
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
}
19 changes: 18 additions & 1 deletion crates/services/txpool_v2/src/collision_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ pub trait CollisionManager {
store_entry: &StorageData,
);

/// Inform the collision manager that a transaction was removed.
/// Inform the collision manager that a transaction was removed from the pool
/// for any reason other than canonical-block commitment (e.g. eviction, TTL
/// expiry, rollback-driven eviction). Cleans up all tracking state including
/// the `contract_users` entry for this transaction.
fn on_removed_transaction(&mut self, transaction: &PoolTransaction);

/// Inform the collision manager that a transaction was committed to a
/// canonical block and has been removed from the pool. Like
/// `on_removed_transaction` but intentionally **skips** the removal of
/// this transaction's ID from `contract_users`. The caller is responsible
/// for invoking `remove_from_contract_users` later — after any
/// preconfirmation-rollback work is complete — so that rollback logic can
/// still observe the full set of contract dependents.
fn on_committed_transaction(&mut self, transaction: &PoolTransaction);

/// Remove the transaction's ID from every `contract_users` entry it
/// contributed to. Call this once per committed transaction, after all
/// rollback logic for the relevant block height has finished.
fn remove_from_contract_users(&mut self, transaction: &PoolTransaction);
}
60 changes: 55 additions & 5 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,13 @@ where
/// Process a preconfirmed transaction as committed while recording its spent
/// inputs so they can be rolled back later if the transaction is not
/// included in the canonical block.
pub fn process_preconfirmed_committed_transaction(&mut self, tx_id: TxId) {
///
/// Returns the pool transaction if it was present in the pool (so that the
/// caller can defer the `contract_users` cleanup until after rollback).
pub fn process_preconfirmed_committed_transaction(
&mut self,
tx_id: TxId,
) -> Option<ArcPoolTx> {
// If the tx was already extracted for local block production it is no
// longer in `tx_id_to_storage_id`, so the branch below that calls
// `record_tentative_spend` will be skipped. Preserve the input keys
Expand All @@ -445,7 +451,7 @@ where
// Invariant violation. Panic in tests, log in production.
debug_assert!(false, "Storage data not found for the transaction");
tracing::warn!("Storage data not found for the transaction.");
return;
return None;
};
self.extracted_outputs
.new_extracted_transaction(&transaction.transaction);
Expand All @@ -455,7 +461,8 @@ where
.record_tentative_spend(tx_id, transaction.transaction.inputs());
self.spent_inputs
.spend_inputs(tx_id, transaction.transaction.inputs());
self.update_components_and_caches_on_removal(iter::once(&transaction));
let pool_tx = transaction.transaction.clone();
self.update_components_and_caches_on_committed(iter::once(&transaction));

let mut new_executable_transaction = false;
for dependent in dependents {
Expand All @@ -470,16 +477,28 @@ where
if new_executable_transaction {
self.new_executable_txs_notifier.send_replace(());
}

self.update_stats();
return Some(pool_tx);
}

self.update_stats();
None
}

/// Process committed transactions:
/// - Remove transaction but keep its dependents and the dependents become executables.
/// - Notify about possible new executable transactions.
pub fn process_committed_transactions(&mut self, tx_ids: impl Iterator<Item = TxId>) {
///
/// Returns the pool transactions that were actually present in the pool (and
/// thus removed). The caller must invoke `remove_from_contract_users` for
/// each returned transaction once all preconfirmation rollback is complete.
pub fn process_committed_transactions(
&mut self,
tx_ids: impl Iterator<Item = TxId>,
) -> Vec<ArcPoolTx> {
let mut transactions_to_promote = vec![];
let mut committed_pool_txs = vec![];
for tx_id in tx_ids {
self.spent_inputs.spend_inputs_by_tx_id(tx_id);
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
Expand All @@ -496,7 +515,8 @@ where
.new_extracted_transaction(&transaction.transaction);
self.spent_inputs
.spend_inputs(tx_id, transaction.transaction.inputs());
self.update_components_and_caches_on_removal(iter::once(&transaction));
committed_pool_txs.push(transaction.transaction.clone());
self.update_components_and_caches_on_committed(iter::once(&transaction));

for dependent in dependents {
if !self.storage.has_dependencies(&dependent) {
Expand Down Expand Up @@ -528,6 +548,7 @@ where
}

self.update_stats();
committed_pool_txs
}

/// Check if the pool has enough space to store a transaction.
Expand Down Expand Up @@ -848,6 +869,35 @@ where
self.register_transaction_counts();
}

/// Like `update_components_and_caches_on_removal` but uses
/// `on_committed_transaction`, which intentionally skips the `contract_users`
/// cleanup. Callers must invoke `remove_from_contract_users` for each
/// committed transaction once all preconfirmation rollback is complete.
fn update_components_and_caches_on_committed<'a>(
&mut self,
removed_transactions: impl Iterator<Item = &'a StorageData>,
) {
for storage_entry in removed_transactions {
let tx = &storage_entry.transaction;
self.current_gas = self.current_gas.saturating_sub(tx.max_gas());
self.current_bytes_size = self
.current_bytes_size
.saturating_sub(tx.metered_bytes_size());
self.tx_id_to_storage_id.remove(&tx.id());
self.collision_manager.on_committed_transaction(tx);
self.selection_algorithm
.on_removed_transaction(storage_entry);
}
self.register_transaction_counts();
}

/// Remove `transaction` from the `contract_users` index. Call this for
/// every committed transaction after preconfirmation rollback is complete.
pub fn remove_from_contract_users(&mut self, transaction: &PoolTransaction) {
self.collision_manager
.remove_from_contract_users(transaction);
}

#[cfg(test)]
pub fn assert_integrity(&self, mut expected_txs: HashSet<TxId>) {
for tx in &self.tx_id_to_storage_id {
Expand Down
42 changes: 39 additions & 3 deletions crates/services/txpool_v2/src/pool_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl PoolWorkerInterface {
pool: tx_pool,
view_provider,
tentative_preconfs: BTreeMap::new(),
tentative_preconf_pool_txs: BTreeMap::new(),
current_canonical_height: initial_block_height,
};

Expand Down Expand Up @@ -278,6 +279,11 @@ pub(super) struct PoolWorker<View, TxStatusManager> {
/// Used to roll back stale preconfirmations when the canonical block at
/// that height does not include those transactions.
tentative_preconfs: BTreeMap<BlockHeight, HashSet<TxId>>,
/// Pool transactions that were removed from the pool when their
/// preconfirmation arrived and need deferred `contract_users` cleanup.
/// Keyed by the tentative block height so cleanup can be applied after
/// rollback reconciliation for each height in `process_block`.
tentative_preconf_pool_txs: BTreeMap<BlockHeight, Vec<ArcPoolTx>>,
/// The height of the last canonical block imported by this node.
/// Used to discard late preconfirmations whose tentative block height
/// is already at or below the canonical tip.
Expand Down Expand Up @@ -508,7 +514,10 @@ where
.map(|tx_status| tx_status.id)
.collect();

self.pool
// Collect pool transactions removed by block commitment so we can
// clean up their `contract_users` entries *after* rollback (see below).
let committed_pool_txs = self
.pool
.process_committed_transactions(confirmed_tx_ids.iter().copied());

block_result.tx_status.iter().for_each(|tx_status| {
Expand Down Expand Up @@ -546,6 +555,21 @@ where
}
}
}
// Now that rollback for this height is complete, clean up the
// `contract_users` entries for preconfirmed txs that were committed
// (removed from pool) when their preconfirmation arrived. Deferring
// until here ensures rollback could still observe those entries.
if let Some(deferred_txs) = self.tentative_preconf_pool_txs.remove(&height) {
for tx in &deferred_txs {
self.pool.remove_from_contract_users(tx);
}
}
}

// Clean up `contract_users` for pool txs committed directly by the
// canonical block (deferred from process_committed_transactions above).
for tx in &committed_pool_txs {
self.pool.remove_from_contract_users(tx);
}

let resolved_txs = self.pending_pool.new_known_txs(
Expand Down Expand Up @@ -602,8 +626,14 @@ where

let (outputs, block_height) = match &status {
PreConfirmationStatus::Success(status) => {
self.pool.process_preconfirmed_committed_transaction(tx_id);
let pool_tx = self.pool.process_preconfirmed_committed_transaction(tx_id);
let height = status.tx_pointer.block_height();
if let Some(tx) = pool_tx {
self.tentative_preconf_pool_txs
.entry(height)
.or_default()
.push(tx);
}
if let Some(outputs) = &status.resolved_outputs {
(outputs, Some(height))
} else {
Expand All @@ -616,8 +646,14 @@ where
}
}
PreConfirmationStatus::Failure(status) => {
self.pool.process_preconfirmed_committed_transaction(tx_id);
let pool_tx = self.pool.process_preconfirmed_committed_transaction(tx_id);
let height = status.tx_pointer.block_height();
if let Some(tx) = pool_tx {
self.tentative_preconf_pool_txs
.entry(height)
.or_default()
.push(tx);
}
if let Some(outputs) = &status.resolved_outputs {
(outputs, Some(height))
} else {
Expand Down
Loading