Skip to content
13 changes: 4 additions & 9 deletions crates/cli/src/commands/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,13 @@ pub struct TestConfigArgs {
}

/// Lists available test case names for a given test category.
/// TODO: Fill with enums TestCases of each category
fn list_test_cases(category: TestCategory) -> Vec<String> {
// Returns available test case names for each category.
match category {
TestCategory::Validator => {
// From validator::supported_validator_test_cases()
vec![
"Ping".to_string(),
"PingMeasure".to_string(),
"PingLoad".to_string(),
]
}
TestCategory::Validator => validator::ValidatorTestCase::all()
.iter()
.map(|tc| tc.name().to_string())
.collect(),
TestCategory::Beacon => {
// TODO: Extract from beacon::supported_beacon_test_cases()
vec![]
Expand Down
285 changes: 276 additions & 9 deletions crates/cli/src/commands/test/validator.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,56 @@
//! Validator client connectivity tests.

use super::{TestCategoryResult, TestConfigArgs};
use crate::error::Result;
use clap::Args;
use std::{io::Write, time::Duration};

use clap::Args;
use rand::Rng;
use tokio::{
net::TcpStream,
sync::mpsc,
time::{Instant, timeout},
};

use super::{
AllCategoriesResult, TestCategory, TestCategoryResult, TestConfigArgs, TestResult, TestVerdict,
calculate_score, evaluate_highest_rtt, evaluate_rtt, publish_result_to_obol_api,
write_result_to_file, write_result_to_writer,
};
use crate::{duration::Duration as CliDuration, error::Result};

// Thresholds (from Go implementation)
const THRESHOLD_MEASURE_AVG: Duration = Duration::from_millis(50);
const THRESHOLD_MEASURE_POOR: Duration = Duration::from_millis(240);
const THRESHOLD_LOAD_AVG: Duration = Duration::from_millis(50);
const THRESHOLD_LOAD_POOR: Duration = Duration::from_millis(240);

/// Validator test cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ValidatorTestCase {
Ping,
PingMeasure,
PingLoad,
}

impl ValidatorTestCase {
/// Returns all validator test cases.
pub fn all() -> &'static [ValidatorTestCase] {
&[
ValidatorTestCase::Ping,
ValidatorTestCase::PingMeasure,
ValidatorTestCase::PingLoad,
]
}

/// Returns the test name as a string.
pub fn name(&self) -> &'static str {
match self {
ValidatorTestCase::Ping => "Ping",
ValidatorTestCase::PingMeasure => "PingMeasure",
ValidatorTestCase::PingLoad => "PingLoad",
}
}
Comment on lines +45 to +51
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to use impl ToString for ValidatorTestCase. It reduces the noise of .name().to_string() while leveraging standard traits (ToString)

}

/// Arguments for the validator test command.
#[derive(Args, Clone, Debug)]
pub struct TestValidatorArgs {
Expand All @@ -30,10 +76,231 @@ pub struct TestValidatorArgs {
}

/// Runs the validator client tests.
pub async fn run(_args: TestValidatorArgs, _writer: &mut dyn Write) -> Result<TestCategoryResult> {
// TODO: Implement validator tests
// - Ping
// - PingMeasure
// - PingLoad
unimplemented!("validator test not yet implemented")
pub async fn run(args: TestValidatorArgs, writer: &mut dyn Write) -> Result<TestCategoryResult> {
tracing::info!("Starting validator client test");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to initialize tracing like pluto_tracing::init(...).expect("Failed to initialize tracing");, otherwise this is never logged.

We should eventually refactor this into a single top level tracing initialization if possible though.


let start_time = Instant::now();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start time should be immediately before run_tests_with_timeout, and elapsed should be computed immediately after.


// Get and filter test cases
let queued_tests: Vec<ValidatorTestCase> = if let Some(ref filter) = args.test_config.test_cases
{
ValidatorTestCase::all()
.iter()
.filter(|tc| filter.contains(&tc.name().to_string()))
.copied()
.collect()
} else {
ValidatorTestCase::all().to_vec()
};

if queued_tests.is_empty() {
return Err(crate::error::CliError::Other(
"test case not supported".into(),
));
}

// Run tests with timeout
let test_results = run_tests_with_timeout(&args, &queued_tests).await;

let score = calculate_score(&test_results);

let mut res = TestCategoryResult::new(TestCategory::Validator);
res.targets.insert(args.api_address.clone(), test_results);
res.execution_time = Some(CliDuration::new(start_time.elapsed()));
res.score = Some(score);

if !args.test_config.quiet {
write_result_to_writer(&res, writer)?;
}

if !args.test_config.output_json.is_empty() {
write_result_to_file(&res, args.test_config.output_json.as_ref()).await?;
}

if args.test_config.publish {
let all = AllCategoriesResult {
validator: Some(res.clone()),
..Default::default()
};
publish_result_to_obol_api(
all,
&args.test_config.publish_addr,
&args.test_config.publish_private_key_file,
)
.await?;
}

Ok(res)
}

/// Timeout error message
const ERR_TIMEOUT_INTERRUPTED: &str = "timeout";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout/interrupted


/// Runs tests with timeout, keeping completed tests on timeout.
async fn run_tests_with_timeout(
args: &TestValidatorArgs,
tests: &[ValidatorTestCase],
) -> Vec<TestResult> {
let (tx, mut rx) = mpsc::channel::<TestResult>(100);
let mut test_iter = tests.iter().peekable();

let timeout_result = tokio::time::timeout(args.test_config.timeout, async {
for &test_case in test_iter.by_ref() {
let result = run_single_test(args, test_case).await;
let _ = tx.send(result).await;
}
})
.await;

// Collect all completed results
drop(tx);
let mut results = Vec::new();
while let Ok(result) = rx.try_recv() {
results.push(result);
}

if timeout_result.is_err()
&& let Some(&interrupted_test) = test_iter.peek()
{
results.push(
TestResult::new(interrupted_test.name())
.fail(std::io::Error::other(ERR_TIMEOUT_INTERRUPTED)),
);
}

results
}

/// Runs a single test case.
async fn run_single_test(args: &TestValidatorArgs, test_case: ValidatorTestCase) -> TestResult {
match test_case {
ValidatorTestCase::Ping => ping_test(args).await,
ValidatorTestCase::PingMeasure => ping_measure_test(args).await,
ValidatorTestCase::PingLoad => ping_load_test(args).await,
}
}

async fn ping_test(args: &TestValidatorArgs) -> TestResult {
let mut result = TestResult::new(ValidatorTestCase::Ping.name());

match timeout(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to qualify functions rather that importing them directly (makes it easier to read)

Duration::from_secs(1),
TcpStream::connect(&args.api_address),
)
.await
{
Ok(Ok(_conn)) => {
result.verdict = TestVerdict::Ok;
}
Ok(Err(e)) => {
return result.fail(e);
}
Err(_) => {
return result.fail(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"connection timeout",
));
}
}

result
}

async fn ping_measure_test(args: &TestValidatorArgs) -> TestResult {
let mut result = TestResult::new(ValidatorTestCase::PingMeasure.name());
let before = Instant::now();

match timeout(
Duration::from_secs(1),
TcpStream::connect(&args.api_address),
)
.await
{
Ok(Ok(_conn)) => {
let rtt = before.elapsed();
result = evaluate_rtt(rtt, result, THRESHOLD_MEASURE_AVG, THRESHOLD_MEASURE_POOR);
}
Ok(Err(e)) => {
return result.fail(e);
}
Err(_) => {
return result.fail(std::io::Error::new(
std::io::ErrorKind::TimedOut,
"connection timeout",
));
}
}

result
}

async fn ping_load_test(args: &TestValidatorArgs) -> TestResult {
tracing::info!(
duration = ?args.load_test_duration,
target = %args.api_address,
"Running ping load tests..."
);

let mut result = TestResult::new(ValidatorTestCase::PingLoad.name());

let (tx, mut rx) = mpsc::channel::<Duration>(100);
let address = args.api_address.clone();
let duration = args.load_test_duration;

let handle = tokio::spawn(async move {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're spawning a task and then immediately await for it. If you want to scope this code then use a block, but I don't see a reason to introduce a task.

let start = Instant::now();
let mut interval = tokio::time::interval(Duration::from_secs(1));

interval.tick().await;
while start.elapsed() < duration {
interval.tick().await;

let tx = tx.clone();
let addr = address.clone();
let remaining = duration.saturating_sub(start.elapsed());

tokio::spawn(async move {
ping_continuously(addr, tx, remaining).await;
});
}
});

let _ = handle.await;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only waits for the scheduler task, not the spawned ping worker tasks themselves. So when calling rx.try_recv(), some ping attempts may still be in flight and their RTTs are not included in the final score.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outdated comment?


let mut rtts = Vec::new();
while let Ok(rtt) = rx.try_recv() {
rtts.push(rtt);
}

tracing::info!(target = %args.api_address, "Ping load tests finished");

result = evaluate_highest_rtt(rtts, result, THRESHOLD_LOAD_AVG, THRESHOLD_LOAD_POOR);

result
}

async fn ping_continuously(address: String, tx: mpsc::Sender<Duration>, max_duration: Duration) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn ping_continuously(address: String, tx: mpsc::Sender<Duration>, max_duration: Duration) {
async fn ping_continuously(address: impl AsRef<str>, tx: mpsc::Sender<Duration>, max_duration: Duration) {

let start = Instant::now();

while start.elapsed() < max_duration {
let before = Instant::now();

match timeout(Duration::from_secs(1), TcpStream::connect(&address)).await {
Ok(Ok(conn)) => {
let rtt = before.elapsed();
if tx.send(rtt).await.is_err() {
drop(conn);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary drop here (conn will be dropped on return)

return;
}
}
Ok(Err(e)) => {
tracing::warn!(target = %address, error = ?e, "Ping connection attempt failed during load test");
}
Err(e) => {
tracing::warn!(target = %address, error = ?e, "Ping connection attempt timed out during load test");
}
}
let sleep_ms = rand::thread_rng().gen_range(0..100);
tokio::time::sleep(Duration::from_millis(sleep_ms)).await;
}
}
Loading