diff --git a/core/src/repair/mod.rs b/core/src/repair/mod.rs index 1de489ea3421cb..b3ada3173839b5 100644 --- a/core/src/repair/mod.rs +++ b/core/src/repair/mod.rs @@ -16,3 +16,4 @@ pub mod result; pub mod serve_repair; pub mod serve_repair_service; pub(crate) mod standard_repair_handler; +pub(crate) mod xdp_sender; diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 2423fff0419825..abf2e21a269f2a 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -16,6 +16,7 @@ use { self, REPAIR_PEERS_CACHE_CAPACITY, RepairPeers, RepairProtocol, RepairRequestHeader, ServeRepair, ShredRepairType, }, + xdp_sender::RepairXdpSender, }, }, bytes::Bytes, @@ -445,6 +446,7 @@ impl RepairService { repair_info: RepairInfo, outstanding_requests: Arc>, repair_service_channels: RepairServiceChannels, + xdp_sender: Option, ) -> Self { let t_repair = { let blockstore = blockstore.clone(); @@ -460,6 +462,7 @@ impl RepairService { repair_service_channels.repair_channels, repair_info, &outstanding_requests, + xdp_sender.as_ref(), ) }) .unwrap() @@ -624,6 +627,7 @@ impl RepairService { repair_info: &RepairInfo, outstanding_requests: &RwLock, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, repair_protocol: Protocol, repair_metrics: &mut RepairMetrics, ) { @@ -656,15 +660,23 @@ impl RepairService { let mut batch_send_repairs_elapsed = Measure::start("batch_send_repairs_elapsed"); if !batch.is_empty() { let num_pkts = batch.len(); - let batch = batch.iter().map(|(bytes, addr)| (bytes, addr)); - match batch_send(repair_socket, batch) { - Ok(()) => (), - Err(SendPktsError::IoError(err, num_failed)) => { - error!( - "{} batch_send failed to send {num_failed}/{num_pkts} packets first error \ - {err:?}", - identity_keypair.pubkey() - ); + if let Some(xdp) = xdp_sender { + for (i, (bytes, addr)) in batch.into_iter().enumerate() { + if let Err(e) = xdp.try_send(i, addr, Bytes::from(bytes)) { + warn!("repair xdp send failed: {e:?}"); + } + } + } else { + let batch = batch.iter().map(|(bytes, addr)| (bytes, addr)); + match batch_send(repair_socket, batch) { + Ok(()) => (), + Err(SendPktsError::IoError(err, num_failed)) => { + error!( + "{} batch_send failed to send {num_failed}/{num_pkts} packets first \ + error {err:?}", + identity_keypair.pubkey() + ); + } } } } @@ -681,6 +693,7 @@ impl RepairService { repair_tracker: &mut RepairTracker, outstanding_requests: &RwLock, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, ) { let RepairChannels { repair_request_quic_sender, @@ -739,6 +752,7 @@ impl RepairService { repair_info, outstanding_requests, repair_socket, + xdp_sender, serve_repair::get_repair_protocol(root_bank.cluster_type()), repair_metrics, ); @@ -751,6 +765,7 @@ impl RepairService { repair_channels: RepairChannels, repair_info: RepairInfo, outstanding_requests: &RwLock, + xdp_sender: Option<&RepairXdpSender>, ) { let (sharable_banks, migration_status) = { let bank_forks_r = repair_info.bank_forks.read().unwrap(); @@ -786,6 +801,7 @@ impl RepairService { &mut repair_tracker, outstanding_requests, repair_socket, + xdp_sender, ); repair_tracker.repair_metrics.maybe_report(); sleep(Duration::from_millis(REPAIR_MS)); @@ -937,6 +953,7 @@ impl RepairService { slot: u64, shred_index: u64, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, outstanding_repair_requests: Arc>, ) { let mut repair_peers = vec![]; @@ -969,6 +986,7 @@ impl RepairService { slot, shred_index, repair_socket, + xdp_sender, outstanding_repair_requests.clone(), ); } @@ -981,6 +999,7 @@ impl RepairService { slot: u64, shred_index: u64, repair_socket: &UdpSocket, + xdp_sender: Option<&RepairXdpSender>, outstanding_repair_requests: Arc>, ) { // Setup repair request @@ -1002,16 +1021,21 @@ impl RepairService { let packet_buf = ServeRepair::repair_proto_to_bytes(&request_proto, &identity_keypair).unwrap(); - // Prepare packet batch to send - let reqs = [(&packet_buf, address)]; - - // Send packet batch - match batch_send(repair_socket, reqs) { - Ok(()) => { - debug!("successfully sent repair request to {pubkey} / {address}!"); + if let Some(xdp) = xdp_sender { + if let Err(e) = xdp.try_send(0, address, Bytes::from(packet_buf)) { + warn!("repair xdp send to {pubkey} ({address}) failed: {e:?}"); + } else { + debug!("successfully sent repair request via XDP to {pubkey} / {address}!"); } - Err(SendPktsError::IoError(err, _num_failed)) => { - error!("batch_send failed to send packet - error = {err:?}"); + } else { + let reqs = [(&packet_buf, address)]; + match batch_send(repair_socket, reqs) { + Ok(()) => { + debug!("successfully sent repair request to {pubkey} / {address}!"); + } + Err(SendPktsError::IoError(err, _num_failed)) => { + error!("batch_send failed to send packet - error = {err:?}"); + } } } } @@ -1264,6 +1288,7 @@ mod test { slot, shred_index, &sender, + None, outstanding_repair_requests, ); diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index ac5fc4b8e78b8d..295a4bff412716 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -14,6 +14,7 @@ use { repair_service::{OutstandingShredRepairs, REPAIR_MS, RepairStats}, request_response::RequestResponse, result::{Error, RepairVerifyError, Result}, + xdp_sender::RepairXdpSender, }, }, agave_votor_messages::migration::MigrationStatus, diff --git a/core/src/repair/xdp_sender.rs b/core/src/repair/xdp_sender.rs new file mode 100644 index 00000000000000..075c7edae633da --- /dev/null +++ b/core/src/repair/xdp_sender.rs @@ -0,0 +1,33 @@ +use { + agave_xdp::transmitter as tx, bytes::Bytes, crossbeam_channel::TrySendError, + std::net::SocketAddrV4, +}; + +/// Convenience wrapper around [`tx::XdpSender`] for the repair path. +/// +/// Like turbine, repair always sends from a fixed source address, so we store +/// it once and attach it to every packet automatically. +#[derive(Clone)] +pub struct RepairXdpSender { + sender: tx::XdpSender, + src_addr: SocketAddrV4, +} + +impl RepairXdpSender { + pub fn new(sender: tx::XdpSender, src_addr: SocketAddrV4) -> Self { + Self { sender, src_addr } + } + + #[inline] + pub fn try_send( + &self, + sender_index: usize, + addr: impl Into, + payload: Bytes, + ) -> Result<(), TrySendError> { + self.sender.try_send( + sender_index, + tx::BytesTxPacket::new(self.src_addr, addr, payload), + ) + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e2c10df1efd4dd..4b59104ef1cca6 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -17,7 +17,10 @@ use { consensus::{Tower, tower_storage::TowerStorage}, cost_update_service::CostUpdateService, drop_bank_service::DropBankService, - repair::repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels}, + repair::{ + repair_service::{OutstandingShredRepairs, RepairInfo, RepairServiceChannels}, + xdp_sender::RepairXdpSender, + }, replay_stage::{ReplayReceivers, ReplaySenders, ReplayStage, ReplayStageConfig}, shred_fetch_stage::{SHRED_FETCH_CHANNEL_SIZE, ShredFetchStage}, voting_service::VotingService, @@ -140,6 +143,7 @@ pub struct TvuConfig { pub shred_sigverify_threads: NonZeroUsize, pub bls_sigverify_threads: NonZeroUsize, pub turbine_xdp_sender: Option, + pub repair_xdp_sender: Option, } impl Default for TvuConfig { @@ -155,6 +159,7 @@ impl Default for TvuConfig { shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), bls_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), turbine_xdp_sender: None, + repair_xdp_sender: None, } } } @@ -428,6 +433,7 @@ impl Tvu { window_service_channels, leader_schedule_cache.clone(), outstanding_repair_requests, + tvu_config.repair_xdp_sender, ) }; diff --git a/core/src/validator.rs b/core/src/validator.rs index 2e9dbedf19ae27..f2e7299a415b1b 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -22,6 +22,7 @@ use { quic_endpoint::{RepairQuicAsyncSenders, RepairQuicSenders, RepairQuicSockets}, repair_handler::RepairHandlerType, serve_repair_service::ServeRepairService, + xdp_sender::RepairXdpSender, }, resource_limits::{ResourceLimitError, adjust_nofile_limit}, sample_performance_service::SamplePerformanceService, @@ -675,7 +676,7 @@ impl Validator { socket_addr_space: SocketAddrSpace, tpu_config: ValidatorTpuConfig, admin_rpc_service_post_init: Arc>>, - xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4)>, + xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4, SocketAddrV4)>, ) -> Result { let exit = Arc::new(AtomicBool::new(false)); Self::new_with_exit( @@ -712,7 +713,7 @@ impl Validator { socket_addr_space: SocketAddrSpace, tpu_config: ValidatorTpuConfig, admin_rpc_service_post_init: Arc>>, - xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4)>, + xdp_builder_with_src_addr: Option<(TransmitterBuilder, SocketAddrV4, SocketAddrV4)>, exit: Arc, ) -> Result { #[cfg(debug_assertions)] @@ -1583,12 +1584,20 @@ impl Validator { // This channel backing up indicates a serious problem in votor let (votor_event_sender, votor_event_receiver) = bounded(1000); - let (xdp_transmitter, turbine_xdp_sender) = - if let Some((xdp_transmit_builder, src_addr)) = xdp_builder_with_src_addr { + let (xdp_transmitter, turbine_xdp_sender, repair_xdp_sender) = + if let Some((xdp_transmit_builder, turbine_src_addr, repair_src_addr)) = + xdp_builder_with_src_addr + { let (rtx, sender) = xdp_transmit_builder.build(); - (Some(rtx), Some(XdpSender::new(sender, src_addr))) + // Use protocol-specific source ports so turbine and repair replies route correctly. + let repair_sender = RepairXdpSender::new(sender.clone(), repair_src_addr); + ( + Some(rtx), + Some(XdpSender::new(sender, turbine_src_addr)), + Some(repair_sender), + ) } else { - (None, None) + (None, None, None) }; // disable all2all tests if not allowed for a given cluster type @@ -1647,6 +1656,7 @@ impl Validator { shred_sigverify_threads: config.tvu_shred_sigverify_threads, bls_sigverify_threads: config.tvu_bls_sigverify_threads, turbine_xdp_sender: turbine_xdp_sender.clone(), + repair_xdp_sender, }, &max_slots, block_metadata_notifier, diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 836904d5847a4b..f6c4e54eef9953 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -5,8 +5,11 @@ use { crate::{ completed_data_sets_service::CompletedDataSetsSender, - repair::repair_service::{ - OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels, + repair::{ + repair_service::{ + OutstandingShredRepairs, RepairInfo, RepairService, RepairServiceChannels, + }, + xdp_sender::RepairXdpSender, }, result::{Error, Result}, }, @@ -267,6 +270,7 @@ impl WindowService { window_service_channels: WindowServiceChannels, leader_schedule_cache: Arc, outstanding_repair_requests: Arc>, + repair_xdp_sender: Option, ) -> WindowService { let cluster_info = repair_info.cluster_info.clone(); let bank_forks = repair_info.bank_forks.clone(); @@ -287,6 +291,7 @@ impl WindowService { repair_info, outstanding_repair_requests, repair_service_channels, + repair_xdp_sender, ); let (duplicate_sender, duplicate_receiver) = unbounded(); diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 43b7bd2e9378b1..1eef0dc92088ff 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -640,6 +640,7 @@ impl AdminRpc for AdminRpcImpl { slot, shred_index, &post_init.repair_socket, + None, post_init.outstanding_repair_requests.clone(), ); Ok(()) diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index a7b1ba19197fa6..53c8e04abfc3b9 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -341,7 +341,13 @@ pub fn execute( std::net::SocketAddrV4, }; - let src_port = node.sockets.retransmit_sockets[0] + let turbine_src_port = node.sockets.retransmit_sockets[0] + .local_addr() + .expect("failed to get local address") + .port(); + let repair_src_port = node + .sockets + .repair .local_addr() .expect("failed to get local address") .port(); @@ -363,7 +369,8 @@ pub fn execute( ( TransmitterBuilder::new(xdp_config, exit.clone()) .expect("failed to create xdp transmitter"), - SocketAddrV4::new(src_ip, src_port), + SocketAddrV4::new(src_ip, turbine_src_port), + SocketAddrV4::new(src_ip, repair_src_port), ) });