From dcfb3b8186f4b1740faaf38106fa23910d3737ff Mon Sep 17 00:00:00 2001 From: metalurgical <97008724+metalurgical@users.noreply.github.com> Date: Mon, 20 Apr 2026 22:48:09 +0200 Subject: [PATCH 1/3] driver: parallel unsupported order detection Make unsupported order detection read-only and execute it in parallel with sorting and data fetching. Apply filtering after update_orders to preserve existing ordering. --- crates/driver/src/domain/competition/mod.rs | 59 +++++----- .../domain/competition/risk_detector/mod.rs | 104 +++++++++--------- crates/driver/src/domain/quote.rs | 10 +- 3 files changed, 92 insertions(+), 81 deletions(-) diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index a466456516..0f46d13381 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -33,7 +33,7 @@ use { time::Instant, }, tokio::{sync::mpsc, task}, - tracing::{Instrument, instrument}, + tracing::{Instrument}, }; pub mod auction; @@ -326,17 +326,24 @@ impl Competition { let cow_amm_orders = tasks.cow_amm_orders.await; auction.orders.extend(cow_amm_orders.iter().cloned()); + // Clone orders so unsupported detection can run in parallel while + // preserving the existing auction ownership model. + let orders_for_filtering = auction.orders.clone(); + let unsupported_uids_future = self.risk_detector.unsupported_order_uids(&orders_for_filtering); + let sort_orders_future = Self::run_blocking_with_timer("sort_orders", move || { // Use spawn_blocking() because a lot of CPU bound computations are happening // and we don't want to block the runtime for too long. Self::sort_orders(auction, solver_address, order_sorting_strategies) }); - // We can sort the orders and fetch auction data in parallel - let (auction, balances, app_data) = - tokio::join!(sort_orders_future, tasks.balances, tasks.app_data); + // We can sort the orders, fetch auction data, and detect unsupported + // orders in parallel. + let (auction, balances, app_data, unsupported_uids) = tokio::join!( + sort_orders_future,tasks.balances,tasks.app_data,unsupported_uids_future + ); - let auction = Self::run_blocking_with_timer("update_orders", move || { + let mut auction = Self::run_blocking_with_timer("update_orders", move || { // Same as before with sort_orders, we use spawn_blocking() because a lot of CPU // bound computations are happening and we want to avoid blocking // the runtime. @@ -344,16 +351,28 @@ impl Competition { }) .await; - // We can run bad token filtering and liquidity fetching in parallel - let (liquidity, auction) = tokio::join!( - async { - match self.solver.liquidity() { - solver::Liquidity::Fetch => tasks.liquidity.await, - solver::Liquidity::Skip => Arc::new(Vec::new()), - } - }, - self.without_unsupported_orders(auction) - ); + let filter_auction = async move { + if !self.solver.config().flashloans_enabled { + auction.orders.retain(|o| o.app_data.flashloan().is_none()); + } + + if !unsupported_uids.is_empty() { + auction + .orders + .retain(|order| !unsupported_uids.contains(&order.uid)); + } + + auction + }; + + let fetch_liquidity = async { + match self.solver.liquidity() { + solver::Liquidity::Fetch => tasks.liquidity.await, + solver::Liquidity::Skip => Arc::new(Vec::new()), + } + }; + + let (auction, liquidity) = tokio::join!(filter_auction, fetch_liquidity); let elapsed = start.elapsed(); metrics::get() @@ -902,16 +921,6 @@ impl Competition { } Ok(()) } - - #[instrument(skip_all)] - async fn without_unsupported_orders(&self, mut auction: Auction) -> Auction { - if !self.solver.config().flashloans_enabled { - auction.orders.retain(|o| o.app_data.flashloan().is_none()); - } - self.risk_detector - .filter_unsupported_orders_in_auction(auction) - .await - } } const MAX_SOLUTIONS_TO_MERGE: usize = 10; diff --git a/crates/driver/src/domain/competition/risk_detector/mod.rs b/crates/driver/src/domain/competition/risk_detector/mod.rs index 357f0b6e04..953870e536 100644 --- a/crates/driver/src/domain/competition/risk_detector/mod.rs +++ b/crates/driver/src/domain/competition/risk_detector/mod.rs @@ -16,11 +16,12 @@ //! we were not able to predict issues with orders and pre-emptively //! filter them out of the auction. use { - crate::domain::competition::{Auction, order::Uid}, + crate::domain::competition::{order::Uid}, eth_domain_types as eth, futures::{StreamExt, stream::FuturesUnordered}, - std::{collections::HashMap, fmt, time::Instant}, + std::{collections::{HashMap, HashSet}, fmt, time::Instant}, }; +use crate::domain::competition::Order; pub mod bad_orders; pub mod bad_tokens; @@ -80,72 +81,73 @@ impl Detector { self } - /// Removes all unsupported orders from the auction. - pub async fn filter_unsupported_orders_in_auction(&self, mut auction: Auction) -> Auction { + pub async fn unsupported_order_uids(&self, orders: &[Order]) -> HashSet { let now = Instant::now(); - // reuse the original allocation - let supported_orders = std::mem::take(&mut auction.orders); let mut token_quality_checks = FuturesUnordered::new(); let mut removed_uids = Vec::new(); - let mut supported_orders: Vec<_> = supported_orders - .into_iter() - .filter(|order| { - self.metrics - .as_ref() - .map(|metrics| metrics.get_quality(&order.uid, now)) - .is_none_or(|q| q != Quality::Unsupported) - }) - .filter_map(|order| { - let sell = self.get_token_quality(order.sell.token, now); - let buy = self.get_token_quality(order.buy.token, now); - match (sell, buy) { - // both tokens supported => keep order - (Quality::Supported, Quality::Supported) => Some(order), - // at least 1 token unsupported => drop order - (Quality::Unsupported, _) | (_, Quality::Unsupported) => { - removed_uids.push(order.uid); - None - } - // sell token quality is unknown => keep order if token is supported - (Quality::Unknown, _) => { - let Some(detector) = &self.simulation_detector else { - // we can't determine quality => assume order is good - return Some(order); - }; - let check_tokens_fut = async move { - let quality = detector.determine_sell_token_quality(&order, now).await; - (order, quality) - }; - token_quality_checks.push(check_tokens_fut); - None - } - // buy token quality is unknown => keep order (because we can't - // determine quality and assume it's good) - (_, Quality::Unknown) => Some(order), + orders.iter().for_each(|order| { + if self + .metrics + .as_ref() + .map(|metrics| metrics.get_quality(&order.uid, now)) + .is_some_and(|q| q == Quality::Unsupported) + { + removed_uids.push(order.uid); + return; + } + + let sell = self.get_token_quality(order.sell.token, now); + let buy = self.get_token_quality(order.buy.token, now); + + match (sell, buy) { + // at least 1 token unsupported => drop order + (Quality::Unsupported, _) | (_, Quality::Unsupported) => { + removed_uids.push(order.uid); } - }) - .collect(); - while let Some((order, quality)) = token_quality_checks.next().await { - if quality == Quality::Supported { - supported_orders.push(order); - } else { - removed_uids.push(order.uid); + // sell token quality is unknown => keep order if token is supported + (Quality::Unknown, _) => { + let Some(detector) = &self.simulation_detector else { + // we can't determine quality => assume order is good + return; + }; + + let check_tokens_fut = async move { + let quality = detector.determine_sell_token_quality(order, now).await; + (order.uid, quality) + }; + token_quality_checks.push(check_tokens_fut); + } + + // both tokens supported => keep order + (Quality::Supported, Quality::Supported) => {} + + // buy token quality is unknown => keep order (because we can't + // determine quality and assume it's good) + (_, Quality::Unknown) => {} + } + }); + + while let Some((uid, quality)) = token_quality_checks.next().await { + if quality != Quality::Supported { + removed_uids.push(uid); } } - auction.orders = supported_orders; if !removed_uids.is_empty() { - tracing::debug!(orders = ?removed_uids, "ignored orders with unsupported tokens"); + tracing::debug!( + orders = ?removed_uids, + "ignored orders with unsupported tokens" + ); } if let Some(detector) = &self.simulation_detector { detector.evict_outdated_entries(); } - auction + removed_uids.into_iter().collect() } /// Updates the tokens quality metric for successful operation. diff --git a/crates/driver/src/domain/quote.rs b/crates/driver/src/domain/quote.rs index 4acc4946ba..736de64aec 100644 --- a/crates/driver/src/domain/quote.rs +++ b/crates/driver/src/domain/quote.rs @@ -141,13 +141,13 @@ impl Order { } solver::Liquidity::Skip => Default::default(), }; - - let auction = self + let mut auction = self .fake_auction(eth, tokens, solver.quote_using_limit_orders()) .await?; - let auction = risk_detector - .filter_unsupported_orders_in_auction(auction) - .await; + let unsupported_uids = risk_detector.unsupported_order_uids(&auction.orders).await; + if !unsupported_uids.is_empty() { + auction.orders.retain(|order| !unsupported_uids.contains(&order.uid)); + } if auction.orders.is_empty() { return Err(QuotingFailed::UnsupportedToken.into()); } From c290e4681b6a5bfd89535486402eeedcde536bf3 Mon Sep 17 00:00:00 2001 From: metalurgical <97008724+metalurgical@users.noreply.github.com> Date: Wed, 22 Apr 2026 09:19:40 +0200 Subject: [PATCH 2/3] driver(tests): unsupported_order_uids test cases --- .../domain/competition/risk_detector/mod.rs | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/crates/driver/src/domain/competition/risk_detector/mod.rs b/crates/driver/src/domain/competition/risk_detector/mod.rs index 953870e536..3eea71e0cb 100644 --- a/crates/driver/src/domain/competition/risk_detector/mod.rs +++ b/crates/driver/src/domain/competition/risk_detector/mod.rs @@ -184,3 +184,135 @@ impl fmt::Debug for Detector { .finish() } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + use eth_domain_types::TokenAmount; + use super::*; + use crate::{infra::solver, + util, + domain::competition::{Order, + order::{signature, + BuyTokenBalance, + Kind, + Partial, + SellTokenBalance, + Side, + Signature, + Uid} + } + }; + + // Helper to create a mock order purely for test + fn order( + uid: Uid, + signer: eth::Address, + sell_token: eth::TokenAddress, + buy_token: eth::TokenAddress, + valid_to: u32, + ) -> Order { + Order { + uid, + receiver: Some(signer), + created: util::Timestamp(0), + valid_to: util::Timestamp(valid_to), + buy: eth::Asset { + token: buy_token, + amount: TokenAmount::from(1), + }, + sell: eth::Asset { + token: sell_token, + amount: TokenAmount::from(1), + }, + side: Side::Sell, + kind: Kind::Limit, + app_data: Default::default(), + partial: Partial::No, + pre_interactions: vec![], + post_interactions: vec![], + sell_token_balance: SellTokenBalance::Erc20, + buy_token_balance: BuyTokenBalance::Erc20, + signature: Signature { + scheme: signature::Scheme::PreSign, + data: Default::default(), + signer, + }, + protocol_fees: Default::default(), + quote: Default::default(), + } + } + + // Helper to create a mock UID purely for test + fn uid(n: u8, signer: eth::Address, valid_to: u32) -> Uid { + let order_hash = eth::B256::from([n; 32]); + Uid::from_parts(order_hash, signer, valid_to) + } + + #[tokio::test] + async fn unsupported_order_uids_empty_returns_empty() { + let detector = Detector::new(Default::default()); + let removed = detector.unsupported_order_uids(&[]).await; + assert!(removed.is_empty()); + } + + #[tokio::test] + async fn all_supported_orders_are_kept() { + let signer = eth::Address::from_slice(&[1; 20]); + let sell_token = eth::Address::from_slice(&[2; 20]).into(); + let buy_token = eth::Address::from_slice(&[3; 20]).into(); + + let detector = Detector::new(Default::default()); + + let removed = detector + .unsupported_order_uids(&[order(uid(1, signer, u32::MAX), signer, sell_token, buy_token, u32::MAX)]) + .await; + + assert!(removed.is_empty()); + } + + #[tokio::test] + async fn unsupported_order_uids_returns_only_unsupported_orders() { + fn addr(n: u8) -> eth::Address { + eth::Address::from_slice(&[n; 20]) + } + + let valid_to = u32::MAX; + + let orders = vec![ + order(uid(1, addr(6), valid_to), addr(6), addr(1).into(), addr(2).into(), valid_to), // metrics bad + order(uid(2, addr(7), valid_to), addr(7), addr(3).into(), addr(2).into(), valid_to), // token bad + order(uid(3, addr(8), valid_to), addr(8), addr(1).into(), addr(2).into(), valid_to), // token supported + order(uid(4, addr(9), valid_to), addr(9), addr(4).into(), addr(2).into(), valid_to), // unknown sell + order(uid(5, addr(10), valid_to), addr(10), addr(1).into(), addr(5).into(), valid_to), // unknown buy + ]; + + let metrics_uid = orders[0].uid; + let token_uid = orders[1].uid; + + let metrics_detector = bad_orders::metrics::Detector::new( + 0.5, + 2, + false, + Duration::from_secs(60), + Duration::from_secs(60), + Duration::from_secs(60), + solver::Name("test_solver".into()), + ); + + let mut detector_config = HashMap::new(); + detector_config.insert(addr(3).into(), Quality::Unsupported); + + let mut detector = Detector::new(detector_config); + detector.with_metrics_detector(metrics_detector); + + // Simulate repeated metrics failure for order with metrics_uid + detector.encoding_failed(&[metrics_uid]); + detector.encoding_failed(&[metrics_uid]); + detector.encoding_failed(&[metrics_uid]); + + let removed = detector.unsupported_order_uids(&orders).await; + + assert_eq!(removed, HashSet::from([metrics_uid, token_uid])); + } +} \ No newline at end of file From e7497d5b3f913f887fe160133bd7788b3f59b875 Mon Sep 17 00:00:00 2001 From: metalurgical <97008724+metalurgical@users.noreply.github.com> Date: Wed, 22 Apr 2026 09:41:03 +0200 Subject: [PATCH 3/3] lint: run cargo fmt --- crates/driver/src/domain/competition/mod.rs | 11 +- .../domain/competition/risk_detector/mod.rs | 101 +++++++++++++----- crates/driver/src/domain/quote.rs | 4 +- 3 files changed, 84 insertions(+), 32 deletions(-) diff --git a/crates/driver/src/domain/competition/mod.rs b/crates/driver/src/domain/competition/mod.rs index 6d87df7c0d..7724a1dc0a 100644 --- a/crates/driver/src/domain/competition/mod.rs +++ b/crates/driver/src/domain/competition/mod.rs @@ -33,7 +33,7 @@ use { time::Instant, }, tokio::{sync::mpsc, task}, - tracing::{Instrument}, + tracing::Instrument, }; pub mod auction; @@ -329,7 +329,9 @@ impl Competition { // Clone orders so unsupported detection can run in parallel while // preserving the existing auction ownership model. let orders_for_filtering = auction.orders.clone(); - let unsupported_uids_future = self.risk_detector.unsupported_order_uids(&orders_for_filtering); + let unsupported_uids_future = self + .risk_detector + .unsupported_order_uids(&orders_for_filtering); let sort_orders_future = Self::run_blocking_with_timer("sort_orders", move || { // Use spawn_blocking() because a lot of CPU bound computations are happening @@ -340,7 +342,10 @@ impl Competition { // We can sort the orders, fetch auction data, and detect unsupported // orders in parallel. let (auction, balances, app_data, unsupported_uids) = tokio::join!( - sort_orders_future,tasks.balances,tasks.app_data,unsupported_uids_future + sort_orders_future, + tasks.balances, + tasks.app_data, + unsupported_uids_future ); let mut auction = Self::run_blocking_with_timer("update_orders", move || { diff --git a/crates/driver/src/domain/competition/risk_detector/mod.rs b/crates/driver/src/domain/competition/risk_detector/mod.rs index 3eea71e0cb..b8fb867b8a 100644 --- a/crates/driver/src/domain/competition/risk_detector/mod.rs +++ b/crates/driver/src/domain/competition/risk_detector/mod.rs @@ -16,12 +16,15 @@ //! we were not able to predict issues with orders and pre-emptively //! filter them out of the auction. use { - crate::domain::competition::{order::Uid}, + crate::domain::competition::{Order, order::Uid}, eth_domain_types as eth, futures::{StreamExt, stream::FuturesUnordered}, - std::{collections::{HashMap, HashSet}, fmt, time::Instant}, + std::{ + collections::{HashMap, HashSet}, + fmt, + time::Instant, + }, }; -use crate::domain::competition::Order; pub mod bad_orders; pub mod bad_tokens; @@ -138,9 +141,9 @@ impl Detector { if !removed_uids.is_empty() { tracing::debug!( - orders = ?removed_uids, - "ignored orders with unsupported tokens" - ); + orders = ?removed_uids, + "ignored orders with unsupported tokens" + ); } if let Some(detector) = &self.simulation_detector { @@ -187,21 +190,27 @@ impl fmt::Debug for Detector { #[cfg(test)] mod tests { - use std::time::Duration; - use eth_domain_types::TokenAmount; - use super::*; - use crate::{infra::solver, - util, - domain::competition::{Order, - order::{signature, - BuyTokenBalance, - Kind, - Partial, - SellTokenBalance, - Side, - Signature, - Uid} - } + use { + super::*, + crate::{ + domain::competition::{ + Order, + order::{ + BuyTokenBalance, + Kind, + Partial, + SellTokenBalance, + Side, + Signature, + Uid, + signature, + }, + }, + infra::solver, + util, + }, + eth_domain_types::TokenAmount, + std::time::Duration, }; // Helper to create a mock order purely for test @@ -265,7 +274,13 @@ mod tests { let detector = Detector::new(Default::default()); let removed = detector - .unsupported_order_uids(&[order(uid(1, signer, u32::MAX), signer, sell_token, buy_token, u32::MAX)]) + .unsupported_order_uids(&[order( + uid(1, signer, u32::MAX), + signer, + sell_token, + buy_token, + u32::MAX, + )]) .await; assert!(removed.is_empty()); @@ -280,11 +295,41 @@ mod tests { let valid_to = u32::MAX; let orders = vec![ - order(uid(1, addr(6), valid_to), addr(6), addr(1).into(), addr(2).into(), valid_to), // metrics bad - order(uid(2, addr(7), valid_to), addr(7), addr(3).into(), addr(2).into(), valid_to), // token bad - order(uid(3, addr(8), valid_to), addr(8), addr(1).into(), addr(2).into(), valid_to), // token supported - order(uid(4, addr(9), valid_to), addr(9), addr(4).into(), addr(2).into(), valid_to), // unknown sell - order(uid(5, addr(10), valid_to), addr(10), addr(1).into(), addr(5).into(), valid_to), // unknown buy + order( + uid(1, addr(6), valid_to), + addr(6), + addr(1).into(), + addr(2).into(), + valid_to, + ), // metrics bad + order( + uid(2, addr(7), valid_to), + addr(7), + addr(3).into(), + addr(2).into(), + valid_to, + ), // token bad + order( + uid(3, addr(8), valid_to), + addr(8), + addr(1).into(), + addr(2).into(), + valid_to, + ), // token supported + order( + uid(4, addr(9), valid_to), + addr(9), + addr(4).into(), + addr(2).into(), + valid_to, + ), // unknown sell + order( + uid(5, addr(10), valid_to), + addr(10), + addr(1).into(), + addr(5).into(), + valid_to, + ), // unknown buy ]; let metrics_uid = orders[0].uid; @@ -315,4 +360,4 @@ mod tests { assert_eq!(removed, HashSet::from([metrics_uid, token_uid])); } -} \ No newline at end of file +} diff --git a/crates/driver/src/domain/quote.rs b/crates/driver/src/domain/quote.rs index 736de64aec..89a9c5eb69 100644 --- a/crates/driver/src/domain/quote.rs +++ b/crates/driver/src/domain/quote.rs @@ -146,7 +146,9 @@ impl Order { .await?; let unsupported_uids = risk_detector.unsupported_order_uids(&auction.orders).await; if !unsupported_uids.is_empty() { - auction.orders.retain(|order| !unsupported_uids.contains(&order.uid)); + auction + .orders + .retain(|order| !unsupported_uids.contains(&order.uid)); } if auction.orders.is_empty() { return Err(QuotingFailed::UnsupportedToken.into());