diff --git a/crates/services/txpool_v2/src/pool_worker.rs b/crates/services/txpool_v2/src/pool_worker.rs index a50de4c507b..2a1725995db 100644 --- a/crates/services/txpool_v2/src/pool_worker.rs +++ b/crates/services/txpool_v2/src/pool_worker.rs @@ -19,6 +19,8 @@ use std::{ sync::Arc, time::SystemTime, }; +#[cfg(test)] +use std::collections::HashSet; use tokio::{ sync::{ broadcast, @@ -229,6 +231,11 @@ pub(super) enum PoolReadRequest { max_txs: usize, response_channel: oneshot::Sender>, }, + #[cfg(test)] + AssertIntegrity { + expected_tx_ids: Vec, + response_channel: oneshot::Sender<()>, + }, } #[allow(clippy::upper_case_acronyms)] @@ -340,6 +347,13 @@ where } => { self.get_non_existing_txs(tx_ids, response_channel); } + #[cfg(test)] + PoolReadRequest::AssertIntegrity { + expected_tx_ids, + response_channel, + } => { + self.assert_integrity(expected_tx_ids, response_channel); + } } } } @@ -599,6 +613,19 @@ where } } + #[cfg(test)] + fn assert_integrity( + &mut self, + expected_tx_ids: Vec, + response_channel: oneshot::Sender<()>, + ) { + self.pool + .assert_integrity(expected_tx_ids.into_iter().collect::>()); + if response_channel.send(()).is_err() { + tracing::error!("Failed to send assert_integrity result"); + } + } + fn has_enough_space_in_pools(&self, tx: &ArcPoolTx) -> bool { let tx_gas = tx.max_gas(); let bytes_size = tx.metered_bytes_size(); diff --git a/crates/services/txpool_v2/src/shared_state.rs b/crates/services/txpool_v2/src/shared_state.rs index 765bb93002e..5cd8a85e003 100644 --- a/crates/services/txpool_v2/src/shared_state.rs +++ b/crates/services/txpool_v2/src/shared_state.rs @@ -137,6 +137,23 @@ impl SharedState { .map_err(|_| Error::ServiceCommunicationFailed) } + #[cfg(test)] + pub async fn assert_integrity(&self, expected_tx_ids: Vec) -> Result<(), Error> { + let (response_channel, result_receiver) = oneshot::channel(); + + self.request_read_sender + .send(PoolReadRequest::AssertIntegrity { + expected_tx_ids, + response_channel, + }) + .await + .map_err(|_| Error::ServiceCommunicationFailed)?; + + result_receiver + .await + .map_err(|_| Error::ServiceCommunicationFailed) + } + /// Get a notifier that is notified when new executable transactions are added to the pool. pub fn get_new_executable_txs_notifier(&self) -> watch::Receiver<()> { self.new_executable_txs_notifier.subscribe() diff --git a/crates/services/txpool_v2/src/tests/tests_service.rs b/crates/services/txpool_v2/src/tests/tests_service.rs index 3c18de9b6df..3b715fef978 100644 --- a/crates/services/txpool_v2/src/tests/tests_service.rs +++ b/crates/services/txpool_v2/src/tests/tests_service.rs @@ -65,6 +65,14 @@ async fn test_find() { .unwrap(); universe.await_expected_tx_statuses_submitted(ids).await; + service + .shared + .assert_integrity(vec![ + tx1.id(&Default::default()), + tx2.id(&Default::default()), + ]) + .await + .unwrap(); // When let out = service @@ -86,6 +94,17 @@ async fn test_find() { service.stop_and_await().await.unwrap(); } +#[tokio::test] +async fn test_service_assert_integrity_handles_empty_pool() { + let universe = TestPoolUniverse::default(); + let service = universe.build_service(None, None); + service.start_and_await().await.unwrap(); + + service.shared.assert_integrity(vec![]).await.unwrap(); + + service.stop_and_await().await.unwrap(); +} + #[tokio::test] async fn test_prune_transactions() { const TIMEOUT: u64 = 3;