diff --git a/fuzz/fuzz_targets/handle.rs b/fuzz/fuzz_targets/handle.rs index b37f84692..fa82b437d 100644 --- a/fuzz/fuzz_targets/handle.rs +++ b/fuzz/fuzz_targets/handle.rs @@ -5,17 +5,16 @@ use std::{ borrow::Cow, io::{Cursor, Write}, net::IpAddr, + sync::{Arc, RwLock}, time::Duration, }; use libfuzzer_sys::fuzz_target; use ntp_proto::{ - test_cookie, - v5::{BloomFilter, ServerId}, - EncryptResult, ExtensionField, ExtensionHeaderVersion, FilterAction, FilterList, - HandleInnerData, KeySetProvider, NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, - NtpVersion, ReferenceId, Server, ServerConfig, ServerReason, ServerResponse, ServerStatHandler, - SystemSnapshot, TimeSnapshot, + test_cookie, v5::BloomFilter, EncryptResult, ExtensionField, ExtensionHeaderVersion, + FilterAction, FilterList, HandleInnerData, KeySetProvider, NtpClock, NtpDuration, + NtpLeapIndicator, NtpServerInfo, NtpSnapshot, NtpTimestamp, NtpVersion, ReferenceId, Server, + ServerConfig, ServerReason, ServerResponse, ServerStatHandler, TimeSnapshot, }; use rand::{rngs::StdRng, set_thread_rng, SeedableRng}; @@ -93,7 +92,7 @@ fuzz_target!(|parts: ( let ip = IpAddr::from(parts.2); - let mut server = Server::new( + let mut server = Server::new_internal( ServerConfig { denylist, allowlist, @@ -105,10 +104,12 @@ fuzz_target!(|parts: ( TestClock { cur: NtpTimestamp::from_seconds_nanos_since_ntp_era(100, 0), }, - SystemSnapshot { - stratum: 1, - reference_id: ReferenceId::NONE, - accumulated_steps_threshold: Some(NtpDuration::from_seconds(0.0)), + Arc::new(RwLock::new(NtpServerInfo { + ntp_snapshot: NtpSnapshot { + stratum: 1, + reference_id: ReferenceId::NONE, + bloom_filter: BloomFilter::new(), + }, time_snapshot: TimeSnapshot { precision: NtpDuration::from_seconds(0.00001), root_delay: NtpDuration::from_seconds(0.01), @@ -119,10 +120,9 @@ fuzz_target!(|parts: ( root_variance_cubic: 0.0, leap_indicator: NtpLeapIndicator::NoWarning, accumulated_steps: NtpDuration::from_seconds(0.0), + accumulated_steps_threshold: None, }, - bloom_filter: BloomFilter::new(), - server_id: ServerId::new(&mut rand::thread_rng()), - }, + })), keyset, ); diff --git a/ntp-proto/src/algorithm/kalman/mod.rs b/ntp-proto/src/algorithm/kalman/mod.rs index f7e2bb78f..e5465743d 100644 --- a/ntp-proto/src/algorithm/kalman/mod.rs +++ b/ntp-proto/src/algorithm/kalman/mod.rs @@ -388,7 +388,11 @@ impl InternalTimeSyncController for KalmanClockController { algo_config, freq_offset, desired_freq: 0.0, - timedata: TimeSnapshot::default(), + timedata: TimeSnapshot { + accumulated_steps_threshold: synchronization_config + .accumulated_step_panic_threshold, + ..TimeSnapshot::default() + }, in_startup: true, }) } @@ -553,7 +557,6 @@ mod tests { offset: NtpDuration::from_seconds(1700.0 + noise), localtime: algo.clock.current_time, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -801,7 +804,6 @@ mod tests { offset: NtpDuration::from_seconds(1700.0 + noise), localtime: algo.clock.current_time, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -856,7 +858,6 @@ mod tests { offset: NtpDuration::from_seconds(-3600.0 + noise), localtime: algo.clock.current_time, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, diff --git a/ntp-proto/src/algorithm/kalman/source.rs b/ntp-proto/src/algorithm/kalman/source.rs index fdb33483e..6895dd8e7 100644 --- a/ntp-proto/src/algorithm/kalman/source.rs +++ b/ntp-proto/src/algorithm/kalman/source.rs @@ -621,7 +621,6 @@ impl // Always update the root_delay, root_dispersion, leap second status and stratum, as they always represent the most accurate state. self.last_measurement.root_delay = measurement.root_delay; self.last_measurement.root_dispersion = measurement.root_dispersion; - self.last_measurement.stratum = measurement.stratum; self.last_measurement.leap = measurement.leap; if measurement.localtime.is_before(self.state.time) { @@ -1019,7 +1018,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1037,7 +1035,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1067,7 +1064,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1086,7 +1082,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1116,7 +1111,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1135,7 +1129,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base + NtpDuration::from_seconds(2800.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1171,7 +1164,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1209,7 +1201,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1239,7 +1230,6 @@ mod tests { offset: NtpDuration::from_seconds(20e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1287,7 +1277,6 @@ mod tests { offset: NtpDuration::from_seconds(-20e-3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1317,7 +1306,6 @@ mod tests { offset: NtpDuration::from_seconds(-20e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1389,7 +1377,6 @@ mod tests { offset: NtpDuration::from_seconds(0.4), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1453,7 +1440,6 @@ mod tests { offset: NtpDuration::from_seconds(0.4), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1471,7 +1457,6 @@ mod tests { offset: NtpDuration::from_seconds(-0.3), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1509,7 +1494,6 @@ mod tests { offset: NtpDuration::from_seconds(0.48), localtime: base + NtpDuration::from_seconds(1.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1533,7 +1517,6 @@ mod tests { offset: NtpDuration::from_seconds(0.49), localtime: base + NtpDuration::from_seconds(2.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1557,7 +1540,6 @@ mod tests { offset: NtpDuration::from_seconds(0.50), localtime: base + NtpDuration::from_seconds(3.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1581,7 +1563,6 @@ mod tests { offset: NtpDuration::from_seconds(-0.49), localtime: base + NtpDuration::from_seconds(4.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1605,7 +1586,6 @@ mod tests { offset: NtpDuration::from_seconds(-0.48), localtime: base + NtpDuration::from_seconds(5.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1629,7 +1609,6 @@ mod tests { offset: NtpDuration::from_seconds(-0.47), localtime: base + NtpDuration::from_seconds(6.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1653,7 +1632,6 @@ mod tests { offset: NtpDuration::from_seconds(-0.46), localtime: base + NtpDuration::from_seconds(7.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1677,7 +1655,6 @@ mod tests { offset: NtpDuration::from_seconds(-0.45), localtime: base + NtpDuration::from_seconds(8.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1728,7 +1705,6 @@ mod tests { offset: NtpDuration::from_seconds(0.0), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1764,7 +1740,6 @@ mod tests { offset: NtpDuration::from_seconds(0.0), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1838,7 +1813,6 @@ mod tests { offset: NtpDuration::from_seconds(0e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1862,7 +1836,6 @@ mod tests { offset: NtpDuration::from_seconds(1e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1886,7 +1859,6 @@ mod tests { offset: NtpDuration::from_seconds(2e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1910,7 +1882,6 @@ mod tests { offset: NtpDuration::from_seconds(3e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1934,7 +1905,6 @@ mod tests { offset: NtpDuration::from_seconds(4e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1958,7 +1928,6 @@ mod tests { offset: NtpDuration::from_seconds(5e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -1982,7 +1951,6 @@ mod tests { offset: NtpDuration::from_seconds(6e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2006,7 +1974,6 @@ mod tests { offset: NtpDuration::from_seconds(7e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2068,7 +2035,6 @@ mod tests { offset: NtpDuration::from_seconds(4e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2092,7 +2058,6 @@ mod tests { offset: NtpDuration::from_seconds(5e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2116,7 +2081,6 @@ mod tests { offset: NtpDuration::from_seconds(6e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2140,7 +2104,6 @@ mod tests { offset: NtpDuration::from_seconds(7e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2165,7 +2128,6 @@ mod tests { offset: NtpDuration::from_seconds(4e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2189,7 +2151,6 @@ mod tests { offset: NtpDuration::from_seconds(5e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2213,7 +2174,6 @@ mod tests { offset: NtpDuration::from_seconds(6e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2237,7 +2197,6 @@ mod tests { offset: NtpDuration::from_seconds(7e-3), localtime: base + NtpDuration::from_seconds(1000.0), - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2295,7 +2254,6 @@ mod tests { offset: NtpDuration::from_seconds(0.0), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, @@ -2420,7 +2378,6 @@ mod tests { offset: NtpDuration::from_seconds(0.0), localtime: base, - stratum: 0, root_delay: NtpDuration::default(), root_dispersion: NtpDuration::default(), leap: NtpLeapIndicator::NoWarning, diff --git a/ntp-proto/src/algorithm/mod.rs b/ntp-proto/src/algorithm/mod.rs index e69d272e0..bbe225a89 100644 --- a/ntp-proto/src/algorithm/mod.rs +++ b/ntp-proto/src/algorithm/mod.rs @@ -137,7 +137,6 @@ pub struct InternalMeasurement { pub offset: NtpDuration, pub localtime: NtpTimestamp, - pub stratum: u8, pub root_delay: NtpDuration, pub root_dispersion: NtpDuration, pub leap: NtpLeapIndicator, @@ -151,7 +150,6 @@ pub struct Measurement { pub sender_ts: NtpTimestamp, pub receiver_ts: NtpTimestamp, - pub stratum: u8, pub root_delay: NtpDuration, pub root_dispersion: NtpDuration, pub leap: NtpLeapIndicator, @@ -172,6 +170,9 @@ pub trait TimeSyncController: Sized + Send + Sync + 'static { ) -> Result::Error>; /// Take control of the clock (should not be done in new!) + /// + /// Should be callable multiple times, with subsequent calls not + /// doing anything. fn take_control(&self) -> Result<(), ::Error>; /// Create a new source with given identity @@ -184,11 +185,6 @@ pub trait TimeSyncController: Sized + Send + Sync + 'static { measurement_noise_estimate: f64, period: Option, ) -> Self::OneWaySourceController; - /// Notify the controller that a previous source has gone - fn remove_source(&self, id: ClockId); - /// Notify the controller that the status of a source (whether - /// or not it is usable for synchronization) has changed. - fn source_update(&self, id: ClockId, usable: bool); /// Current synchronization state fn synchronization_state(&self) -> (TimeSnapshot, Vec); /// Run the internal watchdog and messaging. @@ -198,13 +194,16 @@ pub trait TimeSyncController: Sized + Send + Sync + 'static { pub struct TimeSyncControllerWrapper { inner: Mutex, #[expect(clippy::type_complexity)] - messages_for_system: - Mutex>>, - messages_for_system_sender: tokio::sync::mpsc::UnboundedSender<(ClockId, T::SourceMessage)>, + messages_for_system: Mutex< + Option)>>, + >, + messages_for_system_sender: + tokio::sync::mpsc::UnboundedSender<(ClockId, WrapperMessage)>, oneway_sources: Mutex>>>, twoway_sources: Mutex>>>, snapshot: Mutex, used_sources: Mutex>, + has_taken_control: Mutex, } impl TimeSyncController for TimeSyncControllerWrapper { @@ -229,11 +228,17 @@ impl TimeSyncController for TimeSyncControllerWra twoway_sources: Mutex::new(Vec::new()), snapshot: Mutex::new(TimeSnapshot::default()), used_sources: Mutex::new(Vec::new()), + has_taken_control: Mutex::new(false), }) } fn take_control(&self) -> Result<(), ::Error> { - self.inner.lock().unwrap().take_control() + let mut has_taken_control = self.has_taken_control.lock().unwrap(); + if !*has_taken_control { + self.inner.lock().unwrap().take_control()?; + *has_taken_control = true; + } + Ok(()) } fn add_source(&self, id: ClockId, source_config: SourceConfig) -> Self::NtpSourceController { @@ -276,14 +281,6 @@ impl TimeSyncController for TimeSyncControllerWra wrapper } - fn remove_source(&self, id: ClockId) { - self.inner.lock().unwrap().remove_source(id); - } - - fn source_update(&self, id: ClockId, usable: bool) { - self.inner.lock().unwrap().source_update(id, usable); - } - fn synchronization_state(&self) -> (TimeSnapshot, Vec) { ( *self.snapshot.lock().unwrap(), @@ -297,23 +294,33 @@ impl TimeSyncController for TimeSyncControllerWra loop { tokio::select! { Some((clock_id, message)) = messages_for_system.recv() => { - let update = self.inner.lock().unwrap().source_message(clock_id, message); - if let Some(source_message) = update.source_message { - for source in self.oneway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) { - source.lock().unwrap().handle_message(source_message.clone()); - } - for source in self.twoway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) { - source.lock().unwrap().handle_message(source_message.clone()); - } - } - if let Some(time_snapshot) = update.time_snapshot { - *self.snapshot.lock().unwrap() = time_snapshot; - } - if let Some(used_sources) = update.used_sources { - *self.used_sources.lock().unwrap() = used_sources; - } - if let Some(next_update) = update.next_update { - sleeper.as_mut().reset(tokio::time::Instant::now() + next_update); + match message { + WrapperMessage::SourceMessage(message) => { + let update = self.inner.lock().unwrap().source_message(clock_id, message); + if let Some(source_message) = update.source_message { + for source in self.oneway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) { + source.lock().unwrap().handle_message(source_message.clone()); + } + for source in self.twoway_sources.lock().unwrap().iter().filter_map(Weak::upgrade) { + source.lock().unwrap().handle_message(source_message.clone()); + } + } + if let Some(time_snapshot) = update.time_snapshot { + *self.snapshot.lock().unwrap() = time_snapshot; + } + if let Some(used_sources) = update.used_sources { + *self.used_sources.lock().unwrap() = used_sources; + } + if let Some(next_update) = update.next_update { + sleeper.as_mut().reset(tokio::time::Instant::now() + next_update); + } + }, + WrapperMessage::UsabilityChange(usable) => { + self.inner.lock().unwrap().source_update(clock_id, usable); + }, + WrapperMessage::Dropped => { + self.inner.lock().unwrap().remove_source(clock_id); + }, } }, _ = sleeper.as_mut() => { @@ -344,15 +351,32 @@ impl TimeSyncController for TimeSyncControllerWra pub trait SourceController: Sized + Send + 'static { fn handle_measurement(&mut self, measurement: Measurement); + fn set_usable(&mut self, usable: bool); + fn desired_poll_interval(&self) -> PollInterval; fn observe(&self) -> ObservableSourceTimedata; } +enum WrapperMessage { + SourceMessage(SourceMessage), + UsabilityChange(bool), + Dropped, +} + pub struct OneWaySourceControllerWrapper> { id: ClockId, inner: Arc>, - messages_for_system: tokio::sync::mpsc::UnboundedSender<(ClockId, T::SourceMessage)>, + messages_for_system: + tokio::sync::mpsc::UnboundedSender<(ClockId, WrapperMessage)>, +} + +impl> Drop for OneWaySourceControllerWrapper { + fn drop(&mut self) { + self.messages_for_system + .send((self.id, WrapperMessage::Dropped)) + .ok(); + } } impl> SourceController @@ -368,17 +392,24 @@ impl> SourceController // Remote (which is the send timestamp) - local (which is the receive timestamp) offset: measurement.sender_ts - measurement.receiver_ts, localtime: measurement.receiver_ts, - stratum: measurement.stratum, root_delay: measurement.root_delay, root_dispersion: measurement.root_dispersion, leap: measurement.leap, precision: measurement.precision, }) { - self.messages_for_system.send((self.id, message)).ok(); + self.messages_for_system + .send((self.id, WrapperMessage::SourceMessage(message))) + .ok(); } } + fn set_usable(&mut self, usable: bool) { + self.messages_for_system + .send((self.id, WrapperMessage::UsabilityChange(usable))) + .ok(); + } + fn desired_poll_interval(&self) -> PollInterval { self.inner.lock().unwrap().desired_poll_interval() } @@ -394,7 +425,18 @@ pub struct TwoWaySourceControllerWrapper< id: ClockId, inner: Arc>, last_outgoing_measurement: Option, - messages_for_system: tokio::sync::mpsc::UnboundedSender<(ClockId, T::SourceMessage)>, + messages_for_system: + tokio::sync::mpsc::UnboundedSender<(ClockId, WrapperMessage)>, +} + +impl> Drop + for TwoWaySourceControllerWrapper +{ + fn drop(&mut self) { + self.messages_for_system + .send((self.id, WrapperMessage::Dropped)) + .ok(); + } } impl> SourceController @@ -420,18 +462,25 @@ impl> SourceControll + (measurement.sender_ts - measurement.receiver_ts)) / 2, localtime: measurement.receiver_ts, - stratum: measurement.stratum, root_delay: measurement.root_delay, root_dispersion: measurement.root_dispersion, leap: measurement.leap, precision: measurement.precision, }) { - self.messages_for_system.send((self.id, message)).ok(); + self.messages_for_system + .send((self.id, WrapperMessage::SourceMessage(message))) + .ok(); } } } + fn set_usable(&mut self, usable: bool) { + self.messages_for_system + .send((self.id, WrapperMessage::UsabilityChange(usable))) + .ok(); + } + fn desired_poll_interval(&self) -> PollInterval { self.inner.lock().unwrap().desired_poll_interval() } @@ -526,7 +575,6 @@ mod tests { receiver_id: ClockId(1), sender_ts: NtpTimestamp::from_fixed_int(0), receiver_ts: NtpTimestamp::from_fixed_int(1), - stratum: 0, root_delay: NtpDuration::from_fixed_int(0), root_dispersion: NtpDuration::from_fixed_int(0), leap: NtpLeapIndicator::NoWarning, @@ -537,7 +585,6 @@ mod tests { receiver_id: ClockId::SYSTEM, sender_ts: NtpTimestamp::from_fixed_int(2), receiver_ts: NtpTimestamp::from_fixed_int(3), - stratum: 0, root_delay: NtpDuration::from_fixed_int(0), root_dispersion: NtpDuration::from_fixed_int(0), leap: NtpLeapIndicator::NoWarning, diff --git a/ntp-proto/src/lib.rs b/ntp-proto/src/lib.rs index 24d64d17e..aa20529c3 100644 --- a/ntp-proto/src/lib.rs +++ b/ntp-proto/src/lib.rs @@ -278,10 +278,12 @@ mod exports { pub use super::source::source_snapshot; pub use super::source::{ AcceptSynchronizationError, NtpSource, NtpSourceAction, NtpSourceActionIterator, - NtpSourceSnapshot, NtpSourceUpdate, ObservableSourceState, OneWaySource, - OneWaySourceSnapshot, OneWaySourceUpdate, ProtocolVersion, Reach, SourceNtsData, + NtpSourceSnapshot, ObservableSourceState, OneWaySource, ProtocolVersion, Reach, + SourceNtsData, + }; + pub use super::system::{ + NtpManager, NtpServerInfo, NtpSnapshot, SourceType, SystemSnapshot, TimeSnapshot, }; - pub use super::system::{System, SystemSnapshot, TimeSnapshot}; #[cfg(feature = "__internal-fuzz")] pub use super::time_types::fuzz_duration_from_seconds; diff --git a/ntp-proto/src/packet/mod.rs b/ntp-proto/src/packet/mod.rs index 6d9f8d446..64d7959e5 100644 --- a/ntp-proto/src/packet/mod.rs +++ b/ntp-proto/src/packet/mod.rs @@ -9,7 +9,7 @@ use crate::{ identifiers::ReferenceId, io::NonBlockingWrite, keyset::{DecodedServerCookie, KeySet}, - system::SystemSnapshot, + system::NtpServerInfo, time_types::{NtpDuration, NtpTimestamp, PollInterval}, }; @@ -234,24 +234,24 @@ impl NtpHeaderV3V4 { } fn timestamp_response( - system: &SystemSnapshot, + server_info: &NtpServerInfo, input: Self, recv_timestamp: NtpTimestamp, clock: &C, ) -> Self { Self { mode: NtpAssociationMode::Server, - stratum: system.stratum, + stratum: server_info.ntp_snapshot.stratum, origin_timestamp: input.transmit_timestamp, receive_timestamp: recv_timestamp, - reference_id: system.reference_id, + reference_id: server_info.ntp_snapshot.reference_id, poll: input.poll, - precision: system.time_snapshot.precision.log2(), - root_delay: system.time_snapshot.root_delay, - root_dispersion: system.time_snapshot.root_dispersion(recv_timestamp), + precision: server_info.time_snapshot.precision.log2(), + root_delay: server_info.time_snapshot.root_delay, + root_dispersion: server_info.time_snapshot.root_dispersion(recv_timestamp), // Timestamp must be last to make it as accurate as possible. transmit_timestamp: clock.now().expect("Failed to read time"), - leap: system.time_snapshot.leap_indicator, + leap: server_info.time_snapshot.leap_indicator, reference_timestamp: recv_timestamp.truncated_second_bits(7), } } @@ -621,7 +621,7 @@ impl<'a> NtpPacket<'a> { } pub fn timestamp_response( - system: &SystemSnapshot, + server_info: NtpServerInfo, input: Self, recv_timestamp: NtpTimestamp, clock: &C, @@ -629,7 +629,7 @@ impl<'a> NtpPacket<'a> { match &input.header { NtpHeader::V3(header) => NtpPacket { header: NtpHeader::V3(NtpHeaderV3V4::timestamp_response( - system, + &server_info, *header, recv_timestamp, clock, @@ -639,7 +639,7 @@ impl<'a> NtpPacket<'a> { }, NtpHeader::V4(header) => { let mut response_header = - NtpHeaderV3V4::timestamp_response(system, *header, recv_timestamp, clock); + NtpHeaderV3V4::timestamp_response(&server_info, *header, recv_timestamp, clock); // Respond with the upgrade timestamp (NTP5NTP5) iff the input had it and the packet // had the correct draft identification @@ -667,7 +667,7 @@ impl<'a> NtpPacket<'a> { NtpHeader::V5(header) => NtpPacket { // TODO deduplicate extension handling with V4 header: NtpHeader::V5(v5::NtpHeaderV5::timestamp_response( - system, + &server_info, *header, recv_timestamp, clock, @@ -684,7 +684,8 @@ impl<'a> NtpPacket<'a> { .filter_map(|ef| match ef { uid @ ExtensionField::UniqueIdentifier(_) => Some(uid), ExtensionField::ReferenceIdRequest(req) => { - let response = req.to_response(&system.bloom_filter)?; + let response = + req.to_response(&server_info.ntp_snapshot.bloom_filter)?; Some(ExtensionField::ReferenceIdResponse(response).into_owned()) } _ => None, @@ -712,7 +713,7 @@ impl<'a> NtpPacket<'a> { #[allow(clippy::too_many_lines)] pub fn nts_timestamp_response( - system: &SystemSnapshot, + server_info: NtpServerInfo, input: Self, recv_timestamp: NtpTimestamp, clock: &C, @@ -723,7 +724,7 @@ impl<'a> NtpPacket<'a> { NtpHeader::V3(_) => unreachable!("NTS shouldn't work with NTPv3"), NtpHeader::V4(header) => NtpPacket { header: NtpHeader::V4(NtpHeaderV3V4::timestamp_response( - system, + &server_info, header, recv_timestamp, clock, @@ -768,7 +769,7 @@ impl<'a> NtpPacket<'a> { }, NtpHeader::V5(header) => NtpPacket { header: NtpHeader::V5(v5::NtpHeaderV5::timestamp_response( - system, + &server_info, header, recv_timestamp, clock, @@ -807,7 +808,8 @@ impl<'a> NtpPacket<'a> { .filter_map(|ef| match ef { uid @ ExtensionField::UniqueIdentifier(_) => Some(uid), ExtensionField::ReferenceIdRequest(req) => { - let response = req.to_response(&system.bloom_filter)?; + let response = + req.to_response(&server_info.ntp_snapshot.bloom_filter)?; Some(ExtensionField::ReferenceIdResponse(response).into_owned()) } _ => None, @@ -1681,7 +1683,7 @@ mod tests { let (packet, id) = NtpPacket::nts_poll_message(&cookie, 0, PollIntervalLimits::default().min); let mut response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1736,7 +1738,7 @@ mod tests { let (packet, _) = NtpPacket::poll_message_upgrade_request(PollInterval::default()); let response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1779,7 +1781,7 @@ mod tests { }) .unwrap(); let response = NtpPacket::timestamp_response( - &SystemSnapshot { + NtpServerInfo { time_snapshot: TimeSnapshot { leap_indicator: NtpLeapIndicator::Leap59, ..Default::default() @@ -1834,7 +1836,7 @@ mod tests { }) .unwrap(); let response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1878,7 +1880,7 @@ mod tests { }) .unwrap(); let response = NtpPacket::nts_timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1916,7 +1918,7 @@ mod tests { &mut packet.efdata.untrusted, ); let response = NtpPacket::nts_timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1962,7 +1964,7 @@ mod tests { let (packet, _) = NtpPacket::nts_poll_message(&cookie, 1, PollIntervalLimits::default().min); let response = NtpPacket::nts_timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1976,7 +1978,7 @@ mod tests { let (packet, _) = NtpPacket::nts_poll_message(&cookie, 2, PollIntervalLimits::default().min); let response = NtpPacket::nts_timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -1990,7 +1992,7 @@ mod tests { let (packet, _) = NtpPacket::nts_poll_message(&cookie, 3, PollIntervalLimits::default().min); let response = NtpPacket::nts_timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { @@ -2004,7 +2006,7 @@ mod tests { let (packet, _) = NtpPacket::nts_poll_message(&cookie, 4, PollIntervalLimits::default().min); let response = NtpPacket::nts_timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), packet, NtpTimestamp::from_fixed_int(0), &TestClock { diff --git a/ntp-proto/src/packet/v5/mod.rs b/ntp-proto/src/packet/v5/mod.rs index 70ac25db4..739051a9c 100644 --- a/ntp-proto/src/packet/v5/mod.rs +++ b/ntp-proto/src/packet/v5/mod.rs @@ -1,7 +1,7 @@ #![warn(clippy::missing_const_for_fn)] use crate::{ - NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, SystemSnapshot, - io::NonBlockingWrite, + NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, io::NonBlockingWrite, + system::NtpServerInfo, }; use rand::random; @@ -188,26 +188,26 @@ impl NtpHeaderV5 { } pub(crate) fn timestamp_response( - system: &SystemSnapshot, + server_info: &NtpServerInfo, input: Self, recv_timestamp: NtpTimestamp, clock: &C, ) -> Self { Self { - leap: system.time_snapshot.leap_indicator, + leap: server_info.time_snapshot.leap_indicator, mode: NtpMode::Response, - stratum: system.stratum, + stratum: server_info.ntp_snapshot.stratum, poll: input.poll, - precision: system.time_snapshot.precision.log2(), + precision: server_info.time_snapshot.precision.log2(), timescale: NtpTimescale::Utc, era: NtpEra(0), flags: NtpFlags { - synchronized: system.stratum < 16, + synchronized: server_info.ntp_snapshot.stratum < 16, interleaved_mode: false, authnak: false, }, - root_delay: system.time_snapshot.root_delay, - root_dispersion: system.time_snapshot.root_dispersion(recv_timestamp), + root_delay: server_info.time_snapshot.root_delay, + root_dispersion: server_info.time_snapshot.root_dispersion(recv_timestamp), server_cookie: NtpServerCookie::new_random(), client_cookie: input.client_cookie, receive_timestamp: recv_timestamp, diff --git a/ntp-proto/src/server.rs b/ntp-proto/src/server.rs index c882cb438..e3557bdf8 100644 --- a/ntp-proto/src/server.rs +++ b/ntp-proto/src/server.rs @@ -3,7 +3,7 @@ use std::{ fmt::Display, io::Cursor, net::{AddrParseError, IpAddr}, - sync::Arc, + sync::{Arc, RwLock}, time::{Duration, Instant}, }; @@ -11,7 +11,7 @@ use serde::{Deserialize, Deserializer, de}; use crate::{ Cipher, KeySet, NtpClock, NtpPacket, NtpTimestamp, NtpVersion, PacketParsingError, - SystemSnapshot, ipfilter::IpFilter, + ipfilter::IpFilter, system::NtpServerInfo, }; pub enum ServerAction<'a> { @@ -88,7 +88,7 @@ pub struct Server { denyfilter: IpFilter, allowfilter: IpFilter, client_cache: TimestampedCache, - system: SystemSnapshot, + server_info: Arc>, keyset: Arc, } @@ -99,10 +99,10 @@ fn fallback_message_version(message: &[u8]) -> u8 { impl Server { /// Create a new server - pub fn new( + pub fn new_internal( config: ServerConfig, clock: C, - system: SystemSnapshot, + server_info: Arc>, keyset: Arc, ) -> Self { let denyfilter = IpFilter::new(&config.denylist.filter); @@ -114,30 +114,11 @@ impl Server { denyfilter, allowfilter, client_cache, - system, + server_info, keyset, } } - /// Update the [`ServerConfig`] of the server - pub fn update_config(&mut self, config: ServerConfig) { - if self.config.denylist.filter != config.denylist.filter { - self.denyfilter = IpFilter::new(&config.denylist.filter); - } - if self.config.allowlist.filter != config.allowlist.filter { - self.allowfilter = IpFilter::new(&config.allowlist.filter); - } - if self.config.rate_limiting_cache_size != config.rate_limiting_cache_size { - self.client_cache = TimestampedCache::new(config.rate_limiting_cache_size); - } - self.config = config; - } - - /// Provide the server with the latest [`SystemSnapshot`] - pub fn update_system(&mut self, system: SystemSnapshot) { - self.system = system; - } - /// Provide the server with a new [`KeySet`] pub fn update_keyset(&mut self, keyset: Arc) { self.keyset = keyset; @@ -303,6 +284,8 @@ impl Server { reason = ServerReason::Policy; } + let server_info = *self.server_info.read().unwrap(); + let (packet, cipher, desired_size) = match action { ServerResponse::NTSNak => (NtpPacket::nts_nak_response(packet), None, None), ServerResponse::Deny => { @@ -316,7 +299,7 @@ impl Server { if let Some(cookie) = cookie { ( NtpPacket::nts_timestamp_response( - &self.system, + server_info, packet, recv_timestamp, &self.clock, @@ -329,7 +312,7 @@ impl Server { } else { ( NtpPacket::timestamp_response( - &self.system, + server_info, packet, recv_timestamp, &self.clock, @@ -601,12 +584,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); @@ -667,7 +646,11 @@ mod tests { require_nts: None, accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -711,12 +694,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); @@ -783,7 +762,11 @@ mod tests { require_nts: None, accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -821,12 +804,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); @@ -918,7 +897,11 @@ mod tests { accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -998,12 +981,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, _) = NtpPacket::poll_message(PollIntervalLimits::default().min); let mut serialized = serialize_packet_unencrypted(&packet); @@ -1053,12 +1032,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, _) = NtpPacket::poll_message(PollIntervalLimits::default().min); let mut serialized = serialize_packet_unencrypted(&packet); @@ -1112,7 +1087,11 @@ mod tests { require_nts: None, accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -1142,7 +1121,11 @@ mod tests { require_nts: None, accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -1172,7 +1155,11 @@ mod tests { require_nts: None, accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -1202,7 +1189,11 @@ mod tests { require_nts: None, accepted_versions: vec![NtpVersion::V4], }; - server.update_config(config); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let mut buf = [0; 48]; let response = server.handle( @@ -1241,7 +1232,7 @@ mod tests { let mut stats = TestStatHandler::default(); let keyset = KeySetProvider::new(1).get(); - let mut server = Server::new(config, clock, SystemSnapshot::default(), keyset.clone()); + let mut server = Server::new_internal(config, clock, Arc::default(), keyset.clone()); let decodedcookie = DecodedServerCookie { algorithm: AeadAlgorithm::AeadAesSivCmac256, @@ -1331,10 +1322,10 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( + let mut server = Server::new_internal( config.clone(), clock, - SystemSnapshot::default(), + Arc::default(), KeySetProvider::new(1).get(), ); @@ -1385,7 +1376,11 @@ mod tests { assert!(packet.is_kiss_ntsn()); config.require_nts = Some(FilterAction::Deny); - server.update_config(config.clone()); + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, id) = NtpPacket::poll_message(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); @@ -1430,12 +1425,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, id) = NtpPacket::poll_message_v5(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); @@ -1510,12 +1501,8 @@ mod tests { }; let mut stats = TestStatHandler::default(); - let mut server = Server::new( - config, - clock, - SystemSnapshot::default(), - KeySetProvider::new(1).get(), - ); + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, _) = NtpPacket::poll_message_v5(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); @@ -1535,7 +1522,7 @@ mod tests { ); assert!(matches!(response, ServerAction::Ignore)); - server.update_config(ServerConfig { + let config = ServerConfig { denylist: FilterList { filter: vec![], action: FilterAction::Deny, @@ -1548,7 +1535,13 @@ mod tests { rate_limiting_cache_size: 0, require_nts: None, accepted_versions: vec![NtpVersion::V5], - }); + }; + + let clock = TestClock { + cur: NtpTimestamp::from_fixed_int(200), + }; + let mut server = + Server::new_internal(config, clock, Arc::default(), KeySetProvider::new(1).get()); let (packet, _) = NtpPacket::poll_message(PollIntervalLimits::default().min); let serialized = serialize_packet_unencrypted(&packet); diff --git a/ntp-proto/src/source.rs b/ntp-proto/src/source.rs index ae2273186..6c9b79e46 100644 --- a/ntp-proto/src/source.rs +++ b/ntp-proto/src/source.rs @@ -5,6 +5,8 @@ use crate::{ ExtensionField, NtpHeader, v5::server_reference_id::{BloomFilter, RemoteBloomFilter}, }, + system::NtpSourceInfo, + v5::ServerId, }; use crate::{ algorithm::{ObservableSourceTimedata, SourceController}, @@ -12,15 +14,16 @@ use crate::{ cookiestash::CookieStash, identifiers::ReferenceId, packet::{Cipher, NtpAssociationMode, NtpPacket, RequestIdentifier}, - system::SystemSnapshot, time_types::{NtpTimestamp, PollInterval}, }; use rand::{Rng, thread_rng}; use serde::{Deserialize, Serialize}; use std::{ + collections::HashMap, fmt::Debug, io::Cursor, net::{IpAddr, SocketAddr}, + sync::{Arc, Mutex, RwLock}, time::Duration, }; use tracing::{debug, trace, warn}; @@ -97,6 +100,10 @@ pub struct NtpSource { bloom_filter: RemoteBloomFilter, id: ClockId, + + source_info: Arc>, + + source_snapshots: Arc>>, } pub struct OneWaySource { @@ -104,7 +111,8 @@ pub struct OneWaySource { } impl OneWaySource { - pub(crate) fn new(controller: Controller) -> OneWaySource { + pub fn new(mut controller: Controller) -> OneWaySource { + controller.set_usable(true); OneWaySource { controller } } @@ -176,22 +184,11 @@ impl Reach { } } -#[derive(Debug, Clone)] -pub struct OneWaySourceUpdate { - pub snapshot: OneWaySourceSnapshot, -} - #[derive(Debug, Clone, Copy)] #[expect(clippy::large_enum_variant)] pub enum SourceSnapshot { Ntp(NtpSourceSnapshot), - OneWay(OneWaySourceSnapshot), -} - -#[derive(Debug, Clone, Copy)] -pub struct OneWaySourceSnapshot { - pub source_id: ReferenceId, - pub stratum: u8, + External { stratum: u8, source_id: ReferenceId }, } #[derive(Debug, Clone, Copy)] @@ -216,7 +213,7 @@ impl NtpSourceSnapshot { &self, local_stratum: u8, local_ips: &[IpAddr], - system: &SystemSnapshot, + server_id: ServerId, ) -> Result<(), AcceptSynchronizationError> { use AcceptSynchronizationError::*; @@ -244,7 +241,7 @@ impl NtpSourceSnapshot { } match self.bloom_filter { - Some(filter) if filter.contains_id(&system.server_id) => { + Some(filter) if filter.contains_id(&server_id) => { debug!("Source rejected because of detected synchronization loop (bloom filter)"); return Err(Loop); } @@ -333,25 +330,10 @@ impl ProtocolVersion { } } -#[derive(Clone, Debug)] -pub struct NtpSourceUpdate { - pub(crate) snapshot: NtpSourceSnapshot, -} - -#[cfg(feature = "__internal-test")] -impl NtpSourceUpdate { - pub fn snapshot(snapshot: NtpSourceSnapshot) -> Self { - NtpSourceUpdate { snapshot } - } -} - #[derive(Debug, Clone)] -#[expect(clippy::large_enum_variant)] pub enum NtpSourceAction { /// Send a message over the network. When this is issued, the network port maybe changed. Send(Vec), - /// Send an update to [`System`](crate::system::System) - UpdateSystem(NtpSourceUpdate), /// Call [`NtpSource::handle_timer`] after given duration SetTimer(Duration), /// A complete reset of the connection is necessary, including a potential new NTSKE client session and/or DNS lookup. @@ -402,6 +384,10 @@ pub struct ObservableSourceState { } impl NtpSource { + #[expect( + clippy::too_many_arguments, + reason = "FIXME: See if we can combine some of these once the design is a bit more final" + )] pub(crate) fn new( source_addr: SocketAddr, source_config: SourceConfig, @@ -409,6 +395,8 @@ impl NtpSource { controller: Controller, nts: Option>, id: ClockId, + source_info: Arc>, + source_snapshots: Arc>>, ) -> (Self, NtpSourceActionIterator) { ( Self { @@ -438,6 +426,10 @@ impl NtpSource { bloom_filter: RemoteBloomFilter::new(16).expect("16 is a valid chunk size"), id, + + source_info, + + source_snapshots, }, actions!(NtpSourceAction::SetTimer(Duration::from_secs(0))), ) @@ -548,9 +540,24 @@ impl NtpSource { let used = cursor.position(); let result = &cursor.into_inner()[..used as usize]; + let usable = { + let source_info = self.source_info.read().unwrap(); + snapshot + .accept_synchronization( + source_info.local_stratum, + &source_info.ip_list, + source_info.server_id, + ) + .is_ok() + }; + self.source_snapshots + .lock() + .unwrap() + .insert(self.id, snapshot); + self.controller.set_usable(usable); + actions!( NtpSourceAction::Send(result.into()), - NtpSourceAction::UpdateSystem(NtpSourceUpdate { snapshot }), // randomize the poll interval a little to make it harder to predict poll requests NtpSourceAction::SetTimer( poll_interval @@ -729,6 +736,23 @@ impl NtpSource { } } + let snapshot = NtpSourceSnapshot::from_source(self); + let usable = { + let source_info = self.source_info.read().unwrap(); + snapshot + .accept_synchronization( + source_info.local_stratum, + &source_info.ip_list, + source_info.server_id, + ) + .is_ok() + }; + self.source_snapshots + .lock() + .unwrap() + .insert(self.id, snapshot); + self.controller.set_usable(usable); + let (measurement_outgoing, measurement_incoming) = measurements_from_packet(&message, self.id, send_time, recv_time); self.controller.handle_measurement(measurement_outgoing); @@ -741,9 +765,7 @@ impl NtpSource { } } - actions!(NtpSourceAction::UpdateSystem(NtpSourceUpdate { - snapshot: NtpSourceSnapshot::from_source(self), - })) + actions!() } #[cfg(test)] @@ -778,6 +800,10 @@ impl NtpSource { bloom_filter: RemoteBloomFilter::new(16).unwrap(), id: ClockId(1), + + source_info: Arc::default(), + + source_snapshots: Arc::default(), } } } @@ -794,7 +820,6 @@ fn measurements_from_packet( receiver_id: id, sender_ts: send_time, receiver_ts: message.receive_timestamp(), - stratum: message.stratum(), root_delay: message.root_delay(), root_dispersion: message.root_dispersion(), leap: message.leap(), @@ -805,7 +830,6 @@ fn measurements_from_packet( receiver_id: ClockId::SYSTEM, sender_ts: message.transmit_timestamp(), receiver_ts: recv_time, - stratum: message.stratum(), root_delay: message.root_delay(), root_dispersion: message.root_dispersion(), leap: message.leap(), @@ -821,8 +845,9 @@ fn measurements_from_packet( )] mod test { use crate::{ - NtpClock, NtpDuration, NtpLeapIndicator, + NtpClock, NtpDuration, NtpLeapIndicator, NtpSnapshot, packet::{AesSivCmac256, NoCipher}, + system::NtpServerInfo, time_types::PollIntervalLimits, }; @@ -880,6 +905,10 @@ mod test { // do nothing } + fn set_usable(&mut self, _: bool) { + // do nothing + } + fn desired_poll_interval(&self) -> PollInterval { PollInterval::default() } @@ -925,12 +954,14 @@ mod test { let mut source = NtpSource::test_ntp_source(NoopController); - let system = SystemSnapshot::default(); - macro_rules! accept { () => {{ let snapshot = NtpSourceSnapshot::from_source(&source); - snapshot.accept_synchronization(16, &["127.0.0.1".parse().unwrap()], &system) + snapshot.accept_synchronization( + 16, + &["127.0.0.1".parse().unwrap()], + ServerId::default(), + ) }}; } @@ -956,6 +987,10 @@ mod test { // no action } + fn set_usable(&mut self, _: bool) { + // do nothing + } + fn desired_poll_interval(&self) -> PollInterval { self.0 } @@ -1358,7 +1393,7 @@ mod test { assert!(poll.is_upgrade()); let response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), poll, NtpTimestamp::default(), &clock, @@ -1426,7 +1461,7 @@ mod test { assert!(poll.is_upgrade()); let response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), poll, NtpTimestamp::default(), &clock, @@ -1466,7 +1501,7 @@ mod test { assert_eq!(poll.version(), NtpVersion::V5); let response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), poll, NtpTimestamp::default(), &clock, @@ -1517,7 +1552,7 @@ mod test { assert!(poll.is_upgrade()); let response = NtpPacket::timestamp_response( - &SystemSnapshot::default(), + NtpServerInfo::default(), poll, NtpTimestamp::default(), &clock, @@ -1585,8 +1620,11 @@ mod test { let clock = TestClock::default(); - let server_system = SystemSnapshot { - bloom_filter: server_filter, + let server_info = NtpServerInfo { + ntp_snapshot: NtpSnapshot { + bloom_filter: server_filter, + ..Default::default() + }, ..Default::default() }; @@ -1608,7 +1646,7 @@ mod test { let (req, _) = NtpPacket::deserialize(&req, &NoCipher).unwrap(); let response = - NtpPacket::timestamp_response(&server_system, req, NtpTimestamp::default(), &clock); + NtpPacket::timestamp_response(server_info, req, NtpTimestamp::default(), &clock); let resp_bytes = response.serialize_without_encryption_vec(None).unwrap(); let actions = client.handle_incoming( diff --git a/ntp-proto/src/system.rs b/ntp-proto/src/system.rs index ac743623d..070344ad5 100644 --- a/ntp-proto/src/system.rs +++ b/ntp-proto/src/system.rs @@ -2,14 +2,14 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt::Debug; use std::net::{IpAddr, SocketAddr}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use crate::packet::v5::server_reference_id::{BloomFilter, ServerId}; -use crate::source::{NtpSourceUpdate, SourceSnapshot}; -use crate::{ClockId, NtpTimestamp, OneWaySource, OneWaySourceUpdate}; +use crate::source::SourceSnapshot; +use crate::{ + ClockId, KeySet, NtpSourceSnapshot, NtpTimestamp, Server, ServerConfig, SourceController, +}; use crate::{ - algorithm::TimeSyncController, - clock::NtpClock, config::{SourceConfig, SynchronizationConfig}, identifiers::ReferenceId, packet::NtpLeapIndicator, @@ -37,6 +37,8 @@ pub struct TimeSnapshot { pub leap_indicator: NtpLeapIndicator, /// Total amount that the clock has stepped pub accumulated_steps: NtpDuration, + /// Crossing this amount of stepping will cause a Panic + pub accumulated_steps_threshold: Option, } impl TimeSnapshot { @@ -65,269 +67,210 @@ impl Default for TimeSnapshot { root_variance_cubic: 0.0, leap_indicator: NtpLeapIndicator::Unknown, accumulated_steps: NtpDuration::ZERO, + accumulated_steps_threshold: None, } } } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] pub struct SystemSnapshot { + /// Timekeeping data + #[serde(flatten)] + pub time_snapshot: TimeSnapshot, + /// NTP specific data + #[serde(flatten)] + pub ntp_snapshot: NtpSnapshot, +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct NtpSnapshot { /// Log of the precision of the local clock pub stratum: u8, /// Reference ID of current primary time source pub reference_id: ReferenceId, - /// Crossing this amount of stepping will cause a Panic - pub accumulated_steps_threshold: Option, - /// Timekeeping data - #[serde(flatten)] - pub time_snapshot: TimeSnapshot, /// Bloom filter that contains all currently used time sources #[serde(skip)] pub bloom_filter: BloomFilter, - /// NTPv5 reference ID for this instance - #[serde(skip)] - pub server_id: ServerId, } -impl SystemSnapshot { - pub fn update_timedata(&mut self, timedata: TimeSnapshot, config: &SynchronizationConfig) { - self.time_snapshot = timedata; - self.accumulated_steps_threshold = config.accumulated_step_panic_threshold; - } +impl NtpSnapshot { + pub fn from_used_sources( + local_stratum: u8, + server_id: ServerId, + used_sources: impl Iterator, + ) -> Self { + let mut stratum = local_stratum; + let mut reference_id = ReferenceId::NONE; - pub fn update_used_sources(&mut self, used_sources: impl Iterator) { let mut used_sources = used_sources.peekable(); if let Some(system_source_snapshot) = used_sources.peek() { - let (stratum, source_id) = match system_source_snapshot { + let (source_stratum, source_id) = match system_source_snapshot { SourceSnapshot::Ntp(snapshot) => (snapshot.stratum, snapshot.source_id), - SourceSnapshot::OneWay(snapshot) => (snapshot.stratum, snapshot.source_id), + SourceSnapshot::External { stratum, source_id } => (*stratum, *source_id), }; - self.stratum = stratum.saturating_add(1); - self.reference_id = source_id; + stratum = source_stratum.saturating_add(1); + reference_id = source_id; } - self.bloom_filter = BloomFilter::new(); + let mut bloom_filter = BloomFilter::new(); for source in used_sources { if let SourceSnapshot::Ntp(source) = source { if let Some(bf) = &source.bloom_filter { - self.bloom_filter.add(bf); + bloom_filter.add(bf); } else if let ProtocolVersion::V5 = source.protocol_version { tracing::warn!("Using NTPv5 source without a bloom filter!"); } } } - self.bloom_filter.add_id(&self.server_id); + bloom_filter.add_id(&server_id); + + Self { + stratum, + reference_id, + bloom_filter, + } } } -impl Default for SystemSnapshot { +impl Default for NtpSnapshot { fn default() -> Self { Self { stratum: 16, reference_id: ReferenceId::NONE, - accumulated_steps_threshold: None, - time_snapshot: TimeSnapshot::default(), bloom_filter: BloomFilter::new(), - server_id: ServerId::default(), } } } -pub struct System { - synchronization_config: SynchronizationConfig, - system: Mutex, - ip_list: Mutex>, +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SourceType { + Pps, + Sock, + Ntp, +} - sources: Mutex>>, +#[derive(Default, Copy, Clone)] +pub struct NtpServerInfo { + pub time_snapshot: TimeSnapshot, + pub ntp_snapshot: NtpSnapshot, +} - controller: Controller, - controller_took_control: Mutex, +#[derive(Debug, Default, Clone)] +pub(crate) struct NtpSourceInfo { + pub(crate) ip_list: Arc<[IpAddr]>, + pub(crate) server_id: ServerId, + pub(crate) local_stratum: u8, } -impl System { - pub fn new( - clock: Controller::Clock, - synchronization_config: SynchronizationConfig, - algorithm_config: Controller::AlgorithmConfig, - ip_list: Arc<[IpAddr]>, - ) -> Result::Error> { - // Setup system snapshot - let mut system = SystemSnapshot { - stratum: synchronization_config.local_stratum, - ..Default::default() - }; +pub struct NtpManager { + synchronization_config: SynchronizationConfig, + server_id: ServerId, + source_snapshots: Arc>>, + server_info: Arc>, + source_info: Arc>, +} + +impl NtpManager { + pub fn new(synchronization_config: SynchronizationConfig, ip_list: Arc<[IpAddr]>) -> Self { + let server_id = ServerId::default(); + let source_info = NtpSourceInfo { + ip_list, + server_id, + local_stratum: synchronization_config.local_stratum, + }; + let mut server_info = NtpServerInfo { + time_snapshot: TimeSnapshot::default(), + ntp_snapshot: NtpSnapshot::default(), + }; if synchronization_config.local_stratum == 1 { // We are a stratum 1 server so mark our selves synchronized. - system.time_snapshot.leap_indicator = NtpLeapIndicator::NoWarning; + server_info.time_snapshot.leap_indicator = NtpLeapIndicator::NoWarning; // Set the reference id for the system - system.reference_id = synchronization_config.reference_id.to_reference_id(); + server_info.ntp_snapshot.reference_id = + synchronization_config.reference_id.to_reference_id(); } - - Ok(System { + Self { synchronization_config, - system: Mutex::new(system), - ip_list: Mutex::new(ip_list), - sources: Mutex::new(HashMap::new()), - controller: Controller::new(clock, synchronization_config, algorithm_config)?, - controller_took_control: Mutex::new(false), - }) - } + server_id, + source_snapshots: Arc::new(Mutex::new(HashMap::new())), - pub fn system_snapshot(&self) -> SystemSnapshot { - *self.system.lock().unwrap() - } - - pub fn check_clock_access(&self) -> Result<(), ::Error> { - self.ensure_controller_control() - } - - fn ensure_controller_control(&self) -> Result<(), ::Error> { - // FIXME: the take control pattern needs to go. Until that time this is not ideal but will do. - let mut controller_took_control = self.controller_took_control.lock().unwrap(); - if !*controller_took_control { - self.controller.take_control()?; - *controller_took_control = true; + server_info: Arc::new(RwLock::new(server_info)), + source_info: Arc::new(RwLock::new(source_info)), } - Ok(()) - } - - pub fn create_sock_source( - &self, - id: ClockId, - source_config: SourceConfig, - measurement_noise_estimate: f64, - ) -> Result< - OneWaySource, - ::Error, - > { - self.ensure_controller_control()?; - let controller = - self.controller - .add_one_way_source(id, source_config, measurement_noise_estimate, None); - self.sources.lock().unwrap().insert(id, None); - Ok(OneWaySource::new(controller)) } - pub fn create_pps_source( - &self, - id: ClockId, - source_config: SourceConfig, - measurement_noise_estimate: f64, - period: f64, - ) -> Result< - OneWaySource, - ::Error, - > { - self.ensure_controller_control()?; - let controller = self.controller.add_one_way_source( - id, - source_config, - measurement_noise_estimate, - Some(period), - ); - self.sources.lock().unwrap().insert(id, None); - Ok(OneWaySource::new(controller)) + pub fn new_server(&self, config: ServerConfig, clock: C, keyset: Arc) -> Server { + Server::new_internal(config, clock, self.server_info.clone(), keyset) } - #[expect(clippy::type_complexity)] - pub fn create_ntp_source( + pub fn new_source( &self, - id: ClockId, - source_config: SourceConfig, source_addr: SocketAddr, + source_config: SourceConfig, protocol_version: ProtocolVersion, + controller: Controller, nts: Option>, - ) -> Result< - ( - NtpSource, - NtpSourceActionIterator, - ), - ::Error, - > { - self.ensure_controller_control()?; - let controller = self.controller.add_source(id, source_config); - self.sources.lock().unwrap().insert(id, None); - Ok(NtpSource::new( + id: ClockId, + ) -> (NtpSource, NtpSourceActionIterator) { + NtpSource::new( source_addr, source_config, protocol_version, controller, nts, id, - )) + self.source_info.clone(), + self.source_snapshots.clone(), + ) } - pub fn handle_source_remove( - &self, - id: ClockId, - ) -> Result<(), ::Error> { - self.controller.remove_source(id); - self.sources.lock().unwrap().remove(&id); - Ok(()) + pub fn update_ip_list(&self, ip_list: Arc<[IpAddr]>) { + self.source_info.write().unwrap().ip_list = ip_list; } - pub fn handle_source_update( + pub fn update_used_sources( &self, - id: ClockId, - update: NtpSourceUpdate, - ) -> Result<(), ::Error> { - let system = self.system_snapshot(); - let ip_list = self.ip_list.lock().unwrap().clone(); - let usable = update - .snapshot - .accept_synchronization( + sources: impl Iterator, + ) -> NtpSnapshot { + let source_snapshots = self.source_snapshots.lock().unwrap(); + let sources: Option> = sources + .map(|(id, sourcetype)| match sourcetype { + SourceType::Pps => Some(SourceSnapshot::External { + stratum: 0, + source_id: ReferenceId::PPS, + }), + SourceType::Sock => Some(SourceSnapshot::External { + stratum: 0, + source_id: ReferenceId::SOCK, + }), + SourceType::Ntp => source_snapshots.get(&id).copied().map(SourceSnapshot::Ntp), + }) + .collect(); + drop(source_snapshots); + + if let Some(sources) = sources { + let snapshot = NtpSnapshot::from_used_sources( self.synchronization_config.local_stratum, - ip_list.as_ref(), - &system, - ) - .is_ok(); - self.controller.source_update(id, usable); - *self.sources.lock().unwrap().get_mut(&id).unwrap() = - Some(SourceSnapshot::Ntp(update.snapshot)); - Ok(()) - } + self.server_id, + sources.into_iter(), + ); - pub fn handle_one_way_source_update( - &self, - id: ClockId, - update: OneWaySourceUpdate, - ) -> Result<(), ::Error> { - self.controller.source_update(id, true); - *self.sources.lock().unwrap().get_mut(&id).unwrap() = - Some(SourceSnapshot::OneWay(update.snapshot)); - Ok(()) - } + self.server_info.write().unwrap().ntp_snapshot = snapshot; - pub fn update_ip_list(&self, ip_list: Arc<[IpAddr]>) { - *self.ip_list.lock().unwrap() = ip_list; + snapshot + } else { + self.server_info.read().unwrap().ntp_snapshot + } } - pub fn run(self: Arc) -> impl Future + 'static { - let this = self.clone(); - let update_pusher = async move { - loop { - // Scope here is needed to keep this future sync and send. - { - let (time_snaphsot, used_sources) = this.controller.synchronization_state(); - let sources = this.sources.lock().unwrap(); - let mut system = this.system.lock().unwrap(); - system.update_used_sources(used_sources.iter().map(|v| { - sources.get(v).and_then(|snapshot| *snapshot).expect( - "Critical error: Source used for synchronization that is not known to system", - ) - })); - system.update_timedata(time_snaphsot, &this.synchronization_config); - } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }; - - let controller_run = async move { self.controller.run().await }; + pub fn observe(&self) -> NtpSnapshot { + self.server_info.read().unwrap().ntp_snapshot + } - async move { - tokio::join!(update_pusher, controller_run); - } + pub fn update_time_snapshot(&self, time_snapshot: TimeSnapshot) { + self.server_info.write().unwrap().time_snapshot = time_snapshot; } } @@ -341,20 +284,18 @@ mod tests { #[test] fn test_empty_source_update() { - let mut system = SystemSnapshot::default(); - // Should do nothing - system.update_used_sources(std::iter::empty()); + let ntps = NtpSnapshot::from_used_sources(16, ServerId::default(), std::iter::empty()); - assert_eq!(system.stratum, 16); - assert_eq!(system.reference_id, ReferenceId::NONE); + assert_eq!(ntps.stratum, 16); + assert_eq!(ntps.reference_id, ReferenceId::NONE); } #[test] fn test_source_update() { - let mut system = SystemSnapshot::default(); - - system.update_used_sources( + let ntps = NtpSnapshot::from_used_sources( + 16, + ServerId::default(), vec![ SourceSnapshot::Ntp(NtpSourceSnapshot { source_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), @@ -380,34 +321,7 @@ mod tests { .into_iter(), ); - assert_eq!(system.stratum, 3); - assert_eq!(system.reference_id, ReferenceId::KISS_DENY); - } - - #[test] - fn test_timedata_update() { - let mut system = SystemSnapshot::default(); - - let new_root_delay = NtpDuration::from_seconds(1.0); - let new_accumulated_threshold = NtpDuration::from_seconds(2.0); - - let snapshot = TimeSnapshot { - root_delay: new_root_delay, - ..Default::default() - }; - system.update_timedata( - snapshot, - &SynchronizationConfig { - accumulated_step_panic_threshold: Some(new_accumulated_threshold), - ..Default::default() - }, - ); - - assert_eq!(system.time_snapshot, snapshot); - - assert_eq!( - system.accumulated_steps_threshold, - Some(new_accumulated_threshold), - ); + assert_eq!(ntps.stratum, 3); + assert_eq!(ntps.reference_id, ReferenceId::KISS_DENY); } } diff --git a/ntpd/src/ctl.rs b/ntpd/src/ctl.rs index 26905e330..34614df01 100644 --- a/ntpd/src/ctl.rs +++ b/ntpd/src/ctl.rs @@ -250,7 +250,7 @@ async fn print_state(print: Format, observe_socket: PathBuf) -> Result { - self.channels - .msg_for_system_sender - .send(MsgForSystem::SourceUpdate(self.index, update)) - .await - .ok(); - } ntp_proto::NtpSourceAction::SetTimer(timeout) => { if let Some(deadline) = Instant::now().checked_add(timeout) { // If it overflows, it is so far in the future we may as well not set the timer. @@ -320,9 +308,6 @@ where ntp_proto::NtpSourceAction::Send(_) => { unreachable!("Should not be sending messages from startup") } - ntp_proto::NtpSourceAction::UpdateSystem(_) => { - unreachable!("Should not be updating system from startup") - } ntp_proto::NtpSourceAction::SetTimer(timeout) => { poll_wait.as_mut().reset(Instant::now() + timeout); } @@ -423,9 +408,10 @@ mod tests { }; use ntp_proto::{ - AlgorithmConfig, KalmanClockController, NoCipher, NtpDuration, NtpLeapIndicator, NtpPacket, - ProtocolVersion, SourceConfig, SynchronizationConfig, SystemSnapshot, TimeSnapshot, - TimeSyncControllerWrapper, TwoWayKalmanSourceController, TwoWaySourceControllerWrapper, + AlgorithmConfig, KalmanClockController, NoCipher, NtpDuration, NtpLeapIndicator, + NtpManager, NtpPacket, NtpServerInfo, ProtocolVersion, SourceConfig, SynchronizationConfig, + TimeSnapshot, TimeSyncController, TimeSyncControllerWrapper, TwoWayKalmanSourceController, + TwoWaySourceControllerWrapper, }; use timestamped_socket::socket::{GeneralTimestampMode, Open, open_ip}; use tokio::sync::mpsc; @@ -566,24 +552,22 @@ mod tests { let (msg_for_system_sender, msg_for_system_receiver) = mpsc::channel(1); let index = ClockId::new(); - let system: ntp_proto::System>> = - ntp_proto::System::new( - TestClock {}, - SynchronizationConfig::default(), - AlgorithmConfig::default(), - Arc::new([]), - ) - .unwrap(); + let controller = TimeSyncControllerWrapper::>::new( + TestClock {}, + SynchronizationConfig::default(), + AlgorithmConfig::default(), + ) + .unwrap(); + let ntp_manager = NtpManager::new(SynchronizationConfig::default(), Arc::new([])); - let Ok((source, _)) = system.create_ntp_source( - index, - SourceConfig::default(), + let (source, _) = ntp_manager.new_source( SocketAddr::from((Ipv4Addr::LOCALHOST, port_base)), + SourceConfig::default(), ProtocolVersion::V4, + controller.add_source(index, SourceConfig::default()), None, - ) else { - panic!("Could not create test source"); - }; + index, + ); let process = SourceTask { _wait: PhantomData, @@ -641,7 +625,7 @@ mod tests { // Note: Ports must be unique among tests to deal with parallelism let (mut process, mut socket, mut msg_recv) = test_startup().await; - let system = SystemSnapshot { + let server_info = NtpServerInfo { time_snapshot: TimeSnapshot { leap_indicator: NtpLeapIndicator::NoWarning, ..Default::default() @@ -670,7 +654,7 @@ mod tests { let rec_packet = NtpPacket::deserialize(&buf, &NoCipher).unwrap().0; let send_packet = NtpPacket::timestamp_response( - &system, + server_info, rec_packet, convert_net_timestamp(timestamp), &clock, @@ -679,8 +663,7 @@ mod tests { let serialized = serialize_packet_unencrypted(&send_packet); socket.send_to(&serialized, remote_addr).await.unwrap(); - let msg = msg_recv.recv().await.unwrap(); - assert!(matches!(msg, MsgForSystem::SourceUpdate(_, _))); + assert!(msg_recv.try_recv().is_err()); handle.abort(); } diff --git a/ntpd/src/daemon/observer.rs b/ntpd/src/daemon/observer.rs index 702d93c6f..6669d41cf 100644 --- a/ntpd/src/daemon/observer.rs +++ b/ntpd/src/daemon/observer.rs @@ -200,10 +200,10 @@ async fn handle_connection( mod tests { use std::{borrow::BorrowMut, time::Duration}; - use ntp_proto::v5::{BloomFilter, ServerId}; + use ntp_proto::v5::BloomFilter; use ntp_proto::{ - NtpDuration, NtpLeapIndicator, NtpTimestamp, ObservableSourceTimedata, PollIntervalLimits, - Reach, ReferenceId, TimeSnapshot, + NtpDuration, NtpLeapIndicator, NtpSnapshot, NtpTimestamp, ObservableSourceTimedata, + PollIntervalLimits, Reach, ReferenceId, TimeSnapshot, }; use tokio::{io::AsyncReadExt, net::UnixStream}; @@ -281,9 +281,11 @@ mod tests { let (_, servers_reader) = tokio::sync::watch::channel(vec![]); let (_, system_reader) = tokio::sync::watch::channel(SystemSnapshot { - stratum: 1, - reference_id: ReferenceId::NONE, - accumulated_steps_threshold: None, + ntp_snapshot: NtpSnapshot { + stratum: 1, + reference_id: ReferenceId::NONE, + bloom_filter: BloomFilter::new(), + }, time_snapshot: TimeSnapshot { precision: NtpDuration::from_seconds(1e-3), root_delay: NtpDuration::ZERO, @@ -294,9 +296,8 @@ mod tests { root_variance_cubic: 0.0, leap_indicator: NtpLeapIndicator::Leap59, accumulated_steps: NtpDuration::ZERO, + accumulated_steps_threshold: None, }, - bloom_filter: BloomFilter::new(), - server_id: ServerId::default(), }); let handle = tokio::spawn(async move { @@ -357,9 +358,11 @@ mod tests { let (mut server_writer, servers_reader) = tokio::sync::watch::channel(vec![]); let (mut system_writer, system_reader) = tokio::sync::watch::channel(SystemSnapshot { - stratum: 1, - reference_id: ReferenceId::NONE, - accumulated_steps_threshold: None, + ntp_snapshot: NtpSnapshot { + stratum: 1, + reference_id: ReferenceId::NONE, + bloom_filter: BloomFilter::new(), + }, time_snapshot: TimeSnapshot { precision: NtpDuration::from_seconds(1e-3), root_delay: NtpDuration::ZERO, @@ -370,9 +373,8 @@ mod tests { root_variance_cubic: 0.0, leap_indicator: NtpLeapIndicator::Leap59, accumulated_steps: NtpDuration::ZERO, + accumulated_steps_threshold: None, }, - bloom_filter: BloomFilter::new(), - server_id: ServerId::default(), }); let handle = tokio::spawn(async move { diff --git a/ntpd/src/daemon/pps_source.rs b/ntpd/src/daemon/pps_source.rs index 9799e7ab9..0ddf3b855 100644 --- a/ntpd/src/daemon/pps_source.rs +++ b/ntpd/src/daemon/pps_source.rs @@ -1,14 +1,13 @@ use std::path::PathBuf; use ntp_proto::{ - ClockId, Measurement, NtpDuration, NtpLeapIndicator, OneWaySource, OneWaySourceSnapshot, - OneWaySourceUpdate, ReferenceId, SourceController, + ClockId, Measurement, NtpDuration, NtpLeapIndicator, OneWaySource, SourceController, }; use pps_time::PpsDevice; use tokio::sync::mpsc; use tracing::{Instrument, Span, debug, error, instrument, warn}; -use crate::daemon::{ntp_source::MsgForSystem, util::convert_unix_timestamp}; +use crate::daemon::util::convert_unix_timestamp; use super::ntp_source::SourceChannels; @@ -63,7 +62,6 @@ impl PpsSourceTask { data.info.assert_tu.nsec as _, ), - stratum: 0, root_delay: NtpDuration::ZERO, root_dispersion: NtpDuration::ZERO, leap: NtpLeapIndicator::NoWarning, @@ -72,19 +70,6 @@ impl PpsSourceTask { self.source.handle_measurement(measurement); - let update = OneWaySourceUpdate { - snapshot: OneWaySourceSnapshot { - source_id: ReferenceId::PPS, - stratum: 0, - }, - }; - - self.channels - .msg_for_system_sender - .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) - .await - .ok(); - self.channels .source_snapshots .write() diff --git a/ntpd/src/daemon/server.rs b/ntpd/src/daemon/server.rs index daab87cd0..2311cbfe4 100644 --- a/ntpd/src/daemon/server.rs +++ b/ntpd/src/daemon/server.rs @@ -6,9 +6,7 @@ use std::{ time::Duration, }; -use ntp_proto::{ - KeySet, NtpClock, Server, ServerReason, ServerResponse, ServerStatHandler, SystemSnapshot, -}; +use ntp_proto::{KeySet, NtpClock, Server, ServerReason, ServerResponse, ServerStatHandler}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use timestamped_socket::socket::{RecvResult, open_ip}; use tokio::task::JoinHandle; @@ -103,7 +101,6 @@ impl<'de> Deserialize<'de> for Counter { pub struct ServerTask { config: ServerConfig, network_wait_period: std::time::Duration, - system_receiver: tokio::sync::watch::Receiver, keyset: tokio::sync::watch::Receiver>, server: Server, stats: ServerStats, @@ -112,26 +109,17 @@ pub struct ServerTask { impl ServerTask { #[instrument(level = tracing::Level::ERROR, name = "Ntp Server", skip_all, fields(address = debug(config.listen)))] pub fn spawn( + server: Server, config: ServerConfig, stats: ServerStats, - mut system_receiver: tokio::sync::watch::Receiver, - mut keyset: tokio::sync::watch::Receiver>, - clock: C, + keyset: tokio::sync::watch::Receiver>, network_wait_period: Duration, ) -> JoinHandle<()> { tokio::spawn( (async move { - let server = Server::new( - config.clone().into(), - clock, - *system_receiver.borrow_and_update(), - keyset.borrow_and_update().clone(), - ); - let mut process = ServerTask { config, network_wait_period, - system_receiver, keyset, server, stats, @@ -166,8 +154,6 @@ impl ServerTask { }; // system and keyset may now be wildly out of date, ensure they are always updated. - self.server - .update_system(*self.system_receiver.borrow_and_update()); self.server .update_keyset(self.keyset.borrow_and_update().clone()); @@ -216,9 +202,6 @@ impl ServerTask { } } }, - _ = self.system_receiver.changed(), if self.system_receiver.has_changed().is_ok() => { - self.server.update_system(*self.system_receiver.borrow_and_update()); - } _ = self.keyset.changed(), if self.keyset.has_changed().is_ok() => { self.server.update_keyset(self.keyset.borrow_and_update().clone()); } @@ -300,15 +283,22 @@ mod tests { let clock = TestClock { time: NtpTimestamp::from_seconds_nanos_since_ntp_era(0, 1000), }; - let (_, system_snapshots) = tokio::sync::watch::channel(SystemSnapshot::default()); + + let server_info = Arc::default(); let (_, keyset) = tokio::sync::watch::channel(KeySetProvider::new(1).get()); + let server = Server::new_internal( + config.clone().into(), + clock, + server_info, + keyset.borrow().clone(), + ); + let join = ServerTask::spawn( + server, config, ServerStats::default(), - system_snapshots, keyset, - clock, Duration::from_secs(0), ); diff --git a/ntpd/src/daemon/sock_source.rs b/ntpd/src/daemon/sock_source.rs index e7ac91fb9..3fc9fcc6f 100644 --- a/ntpd/src/daemon/sock_source.rs +++ b/ntpd/src/daemon/sock_source.rs @@ -2,15 +2,14 @@ use std::path::PathBuf; use std::{fmt::Display, path::Path}; use ntp_proto::{ - ClockId, Measurement, NtpClock, NtpDuration, NtpLeapIndicator, OneWaySource, - OneWaySourceSnapshot, OneWaySourceUpdate, ReferenceId, SourceController, + ClockId, Measurement, NtpClock, NtpDuration, NtpLeapIndicator, OneWaySource, SourceController, }; use tracing::debug; use tracing::{Instrument, Span, error, instrument}; use tokio::net::UnixDatagram; -use crate::daemon::{exitcode, ntp_source::MsgForSystem}; +use crate::daemon::exitcode; use super::ntp_source::SourceChannels; @@ -142,7 +141,6 @@ where sender_ts: time - NtpDuration::from_seconds(sample.offset), receiver_ts: time, - stratum: 0, root_delay: NtpDuration::ZERO, root_dispersion: NtpDuration::ZERO, leap, @@ -151,18 +149,6 @@ where self.source.handle_measurement(measurement); - let update = OneWaySourceUpdate { - snapshot: OneWaySourceSnapshot { - source_id: ReferenceId::SOCK, - stratum: 0, - }, - }; - self.channels - .msg_for_system_sender - .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) - .await - .ok(); - self.channels .source_snapshots .write() @@ -222,13 +208,14 @@ mod tests { use ntp_proto::{ AlgorithmConfig, ClockId, KalmanClockController, NtpClock, NtpDuration, NtpLeapIndicator, - NtpTimestamp, ReferenceId, SourceConfig, SynchronizationConfig, TimeSyncControllerWrapper, + NtpTimestamp, OneWaySource, SourceConfig, SynchronizationConfig, TimeSyncController, + TimeSyncControllerWrapper, }; use tokio::sync::mpsc; use crate::{ daemon::{ - ntp_source::{MsgForSystem, SourceChannels}, + ntp_source::SourceChannels, sock_source::{SOCK_MAGIC, SampleError, SockSourceTask, create_socket}, util::EPOCH_OFFSET, }, @@ -287,18 +274,16 @@ mod tests { #[tokio::test] async fn test_read_sock() { - let (msg_for_system_sender, mut msg_for_system_receiver) = mpsc::channel(1); + let (msg_for_system_sender, _) = mpsc::channel(1); let index = ClockId::new(); let clock = TestClock {}; - let system: ntp_proto::System>> = - ntp_proto::System::new( - clock.clone(), - SynchronizationConfig::default(), - AlgorithmConfig::default(), - Arc::new([]), - ) - .unwrap(); + let controller = TimeSyncControllerWrapper::>::new( + clock.clone(), + SynchronizationConfig::default(), + AlgorithmConfig::default(), + ) + .unwrap(); let socket_path = std::env::temp_dir().join(format!("ntp-test-stream-{}", alloc_port())); let _socket = create_socket(&socket_path).unwrap(); // should be overwritten by SockSource's own socket @@ -311,9 +296,12 @@ mod tests { msg_for_system_sender, source_snapshots: Arc::new(RwLock::new(HashMap::new())), }, - system - .create_sock_source(index, SourceConfig::default(), 0.001) - .unwrap(), + OneWaySource::new(controller.add_one_way_source( + index, + SourceConfig::default(), + 0.001, + None, + )), ); // Send example data to socket @@ -325,19 +313,6 @@ mod tests { ]; sock.send(&buf).unwrap(); - // Receive system update - let msg = msg_for_system_receiver.recv().await.unwrap(); - let update = match msg { - MsgForSystem::OneWaySourceUpdate(source_id, sock_source_update) => { - assert_eq!(source_id, index); - sock_source_update - } - _ => panic!("wrong message type"), - }; - - assert_eq!(update.snapshot.source_id, ReferenceId::SOCK); - assert_eq!(update.snapshot.stratum, 0); - handle.abort(); } diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index c2a32a9b6..cd3954e24 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -23,12 +23,12 @@ use super::spawn::pps::PpsSpawner; use std::{ collections::HashMap, net::IpAddr, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, }; use ntp_proto::{ - ClockId, KeySet, NtpClock, ObservableSourceState, SourceConfig, SynchronizationConfig, System, - SystemSnapshot, TimeSyncController, + ClockId, KeySet, NtpClock, NtpManager, ObservableSourceState, OneWaySource, SourceConfig, + SourceType, SynchronizationConfig, SystemSnapshot, TimeSyncController, }; use timestamped_socket::interface::InterfaceName; use tokio::{sync::mpsc, task::JoinHandle}; @@ -128,7 +128,8 @@ struct SystemSpawnerData { } struct SystemTask> { - system: Arc>, + controller: Arc, + ntp_manager: Arc, system_snapshot_sender: tokio::sync::watch::Sender, source_snapshots: Arc>>, @@ -141,7 +142,7 @@ struct SystemTask> { spawn_tx: mpsc::Sender, spawn_rx: mpsc::Receiver, - sources: HashMap, + sources: Arc>>, servers: Vec, spawners: Vec, @@ -167,24 +168,27 @@ impl> SystemTask>, have_sources: bool, ) -> (Self, DaemonChannels) { - let Ok(system) = System::new( - clock.clone(), - synchronization_config, - algorithm_config, - ip_list.borrow().clone(), - ) else { - tracing::error!("Could not start system"); + let Ok(controller) = + Controller::new(clock.clone(), synchronization_config, algorithm_config) + else { + tracing::error!("Could not create clock controller"); std::process::exit(70); }; + let ntp_manager = NtpManager::new(synchronization_config, ip_list.borrow().clone()); - if have_sources && let Err(e) = system.check_clock_access() { + if have_sources && let Err(e) = controller.take_control() { tracing::error!("Could not control clock: {}", e); std::process::exit(70); } + let system_snapshot = SystemSnapshot { + time_snapshot: controller.synchronization_state().0, + ntp_snapshot: ntp_manager.observe(), + }; + // Create communication channels let (system_snapshot_sender, system_snapshot_receiver) = - tokio::sync::watch::channel(system.system_snapshot()); + tokio::sync::watch::channel(system_snapshot); let source_snapshots = Arc::new(RwLock::new(HashMap::new())); let (server_data_sender, server_data_receiver) = tokio::sync::watch::channel(vec![]); let (msg_for_system_sender, msg_for_system_receiver) = @@ -194,7 +198,8 @@ impl> SystemTask> SystemTask> SystemTask std::io::Result<()> { - let proto_system = self.system.clone(); - let proto_system_run = proto_system.run(); + let controller = self.controller.clone(); + let controller_run = controller.run(); let sender = self.system_snapshot_sender.clone(); - let proto_system = self.system.clone(); + let controller = self.controller.clone(); + let ntp_manager = self.ntp_manager.clone(); + let sources = self.sources.clone(); let timer_loop = async move { loop { - sender.send(proto_system.system_snapshot()).ok(); + // Scope is needed to keep the future send. + { + let (time_snapshot, used_sources) = controller.synchronization_state(); + let sources = sources.lock().unwrap(); + ntp_manager.update_time_snapshot(time_snapshot); + + if let Some(used_sources) = used_sources + .into_iter() + .map(|id| sources.get(&id).map(|state| (id, state.stype))) + .collect::>>() + { + let ntp_snapshot = + ntp_manager.update_used_sources(used_sources.into_iter()); + sender + .send(SystemSnapshot { + time_snapshot, + ntp_snapshot, + }) + .ok(); + } else { + sender.send_modify(|v| v.time_snapshot = time_snapshot); + } + } + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; } }; + let ntp_manager = self.ntp_manager.clone(); let event_loop = async move { loop { tokio::select! { @@ -276,7 +307,7 @@ impl> SystemTask { - self.system.update_ip_list(self.ip_list.borrow_and_update().clone()); + ntp_manager.update_ip_list(self.ip_list.borrow_and_update().clone()); } } } @@ -285,7 +316,7 @@ impl> SystemTask std::io::Result<()> { @@ -297,16 +328,6 @@ impl> SystemTask { - self.system - .handle_source_update(index, update) - .expect("Could not process source measurement"); - } - MsgForSystem::OneWaySourceUpdate(index, update) => { - self.system - .handle_one_way_source_update(index, update) - .expect("Could not process source measurement"); - } MsgForSystem::NetworkIssue(index) => { self.handle_source_network_issue(index).await?; } @@ -319,12 +340,8 @@ impl> SystemTask std::io::Result<()> { - self.system - .handle_source_remove(index) - .map_err(std::io::Error::other)?; - // Restart the source reusing its configuration. - let state = self.sources.remove(&index).unwrap(); + let state = self.sources.lock().unwrap().remove(&index).unwrap(); let spawner_id = state.spawner_id; let source_id = state.source_id; let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id); @@ -343,12 +360,8 @@ impl> SystemTask std::io::Result<()> { - self.system - .handle_source_remove(index) - .map_err(std::io::Error::other)?; - // Restart the source reusing its configuration. - let state = self.sources.remove(&index).unwrap(); + let state = self.sources.lock().unwrap().remove(&index).unwrap(); let spawner_id = state.spawner_id; let source_id = state.source_id; let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id); @@ -367,10 +380,8 @@ impl> SystemTask Result<(), C::Error> { - self.system.handle_source_remove(index)?; - // Restart the source reusing its configuration. - let state = self.sources.remove(&index).unwrap(); + let state = self.sources.lock().unwrap().remove(&index).unwrap(); let spawner_id = state.spawner_id; let source_id = state.source_id; let opt_spawner = self.spawners.iter().find(|s| s.id == spawner_id); @@ -394,23 +405,30 @@ impl> SystemTask Result { let source_id = params.get_id(); info!(source_id=?source_id, addr=?params.get_addr(), spawner=?spawner_id, "new source"); - self.sources.insert( + self.sources.lock().unwrap().insert( source_id, SourceState { source_id, spawner_id, + stype: match ¶ms { + SourceCreateParameters::Ntp(_) => SourceType::Ntp, + SourceCreateParameters::Sock(_) => SourceType::Sock, + SourceCreateParameters::Pps(_) => SourceType::Pps, + }, }, ); match params { SourceCreateParameters::Ntp(ref mut params) => { - let (source, initial_actions) = self.system.create_ntp_source( - source_id, - params.config, + let source_controller = self.controller.add_source(source_id, params.config); + let (source, initial_actions) = self.ntp_manager.new_source( params.addr, + params.config, params.protocol_version, + source_controller, params.nts.take(), - )?; + source_id, + ); SourceTask::spawn( source_id, @@ -428,11 +446,13 @@ impl> SystemTask { - let source = self.system.create_sock_source( + let source_controller = self.controller.add_one_way_source( source_id, params.config, params.noise_estimate, - )?; + None, + ); + let source = OneWaySource::new(source_controller); SockSourceTask::spawn( source_id, params.path.clone(), @@ -446,12 +466,13 @@ impl> SystemTask { - let source = self.system.create_pps_source( + let source_controller = self.controller.add_one_way_source( source_id, params.config, params.noise_estimate, - params.period, - )?; + Some(params.period), + ); + let source = OneWaySource::new(source_controller); PpsSourceTask::spawn( source_id, params.path.clone(), @@ -492,12 +513,16 @@ impl> SystemTask> SystemTask st Measurement::simple( state .system + .time_snapshot .accumulated_steps_threshold .map_or(-1.0, NtpDuration::to_seconds), ), @@ -239,7 +240,7 @@ pub fn format_state(w: &mut impl std::fmt::Write, state: &ObservableState) -> st "Stratum of our clock", MetricType::Gauge, None, - Measurement::simple(state.system.stratum), + Measurement::simple(state.system.ntp_snapshot.stratum), )?; format_metric(