Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ethexe/node-loader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Available flags:
- `--max-top-up-value <u128>` in WVARA smallest units
- `--total-msg-value-budget <u128>` in wei
- `--total-top-up-budget <u128>` in WVARA smallest units
- `--program-creation-ratio <0..100>` controls new program creation after bootstrapping
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The documentation for --program-creation-ratio specifies the range as <0..100>, which can be ambiguous. The implementation in args.rs uses an inclusive range 0..=100. To improve clarity and prevent misinterpretation, please consider updating the documentation to explicitly show an inclusive range, for example <0-100> or <0..=100>.

Suggested change
- `--program-creation-ratio <0..100>` controls new program creation after bootstrapping
- --program-creation-ratio <0-100> controls new program creation after bootstrapping


The examples below only show value-control settings; they still use the loader's existing
connection and address defaults unless you also pass network-specific endpoints or addresses.
Expand Down Expand Up @@ -72,6 +73,9 @@ Budget accounting is based on planned spend at scheduling time. The batch that c
budget is still submitted, then the loader stops scheduling new work and exits with
`status: budget exhausted`.

Use `--program-creation-ratio 0` for sustained message/reply/claim traffic against existing
programs without scheduling new code or program uploads after the initial bootstrap program exists.

## What it does

Runs a continuous load test against an `ethexe` dev node, generating randomized batches that:
Expand Down
4 changes: 2 additions & 2 deletions ethexe/node-loader/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ pub struct LoadParams {
pub batch_size: usize,
/// Percentage of load batches that create new programs after bootstrapping.
///
/// The default preserves the historical mix: `upload_program` and
/// `create_program` were two out of six uniformly selected batch families.
/// Standalone `upload_code` batches are excluded from steady-state traffic,
/// so `0` stops scheduling new code/program growth once a program exists.
#[arg(long, value_parser = clap::value_parser!(u8).range(0..=100))]
pub program_creation_ratio: Option<u8>,
/// Whether to batch regular `send_message` calls through the multicall contract.
Expand Down
98 changes: 89 additions & 9 deletions ethexe/node-loader/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,20 @@ type WorkerBatchFuture =

const MAX_MULTICALL_CALLDATA_BYTES: usize = 120 * 1024;

/// Per-batch watchdog: drop and reschedule a batch if a hung RPC call parks
/// the worker. Generous: code validation alone takes ~14 s for 5 codes.
const BATCH_TIMEOUT: Duration = Duration::from_secs(180);
/// Minimum per-batch watchdog: drop and reschedule a batch if a hung RPC call
/// parks the worker.
const MIN_BATCH_TIMEOUT: Duration = Duration::from_secs(180);

/// Conservative wall-clock allowance for each block scanned by
/// [`process_events`]. This keeps the watchdog above the expected wait window
/// when the local chain runs with a slower block time.
const BATCH_TIMEOUT_PER_EVENT_BLOCK: Duration = Duration::from_secs(15);

/// Code validation is slower than plain transaction submission, so batches that
/// upload code get extra time before the worker watchdog fires.
const BATCH_TIMEOUT_PER_CODE_VALIDATION: Duration = Duration::from_secs(20);

const BATCH_TIMEOUT_PER_TX: Duration = Duration::from_secs(10);

/// Cadence of pool-progress heartbeats so a stalled pool is visible in `docker logs`.
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -523,9 +534,10 @@ async fn run_batch(
mid_map: MidMap,
) -> (EthexeRpcPool, Result<BatchRunReport>) {
let PreparedBatchWithSeed { seed, batch, .. } = batch;
let batch_timeout = batch_watchdog_timeout(&batch, use_send_message_multicall);

let result = match tokio::time::timeout(
BATCH_TIMEOUT,
batch_timeout,
run_batch_impl(
api,
&mut rpc_pool,
Expand All @@ -547,12 +559,12 @@ async fn run_batch(
Err(_) => {
tracing::warn!(
seed,
timeout_secs = BATCH_TIMEOUT.as_secs(),
timeout_secs = batch_timeout.as_secs(),
"Batch timed out, dropping it and rescheduling worker"
);
Err(anyhow::anyhow!(
"batch {seed} exceeded {:?} watchdog timeout",
BATCH_TIMEOUT
batch_timeout
))
}
};
Expand Down Expand Up @@ -1247,6 +1259,49 @@ fn send_message_wait_window(action_count: usize, use_send_message_multicall: boo
}
}

fn scale_duration(duration: Duration, factor: usize) -> Duration {
duration.saturating_mul(factor.try_into().unwrap_or(u32::MAX))
}

fn event_window_timeout(blocks: usize) -> Duration {
scale_duration(BATCH_TIMEOUT_PER_EVENT_BLOCK, blocks)
}

fn batch_watchdog_timeout(batch: &PreparedBatch, use_send_message_multicall: bool) -> Duration {
let dynamic_timeout = match batch {
PreparedBatch::UploadProgram(args) => {
let estimated_event_blocks = blocks_window(args.len(), 2, 6);
event_window_timeout(estimated_event_blocks)
.saturating_add(scale_duration(
BATCH_TIMEOUT_PER_CODE_VALIDATION,
args.len(),
))
.saturating_add(scale_duration(BATCH_TIMEOUT_PER_TX, args.len()))
}
PreparedBatch::UploadCode(args) => {
scale_duration(BATCH_TIMEOUT_PER_CODE_VALIDATION, args.len())
}
PreparedBatch::SendMessage(args) => event_window_timeout(send_message_wait_window(
args.len(),
use_send_message_multicall,
))
.saturating_add(scale_duration(BATCH_TIMEOUT_PER_TX, args.len())),
PreparedBatch::CreateProgram(args) => {
let estimated_event_blocks = blocks_window(args.len(), 1, 6);
event_window_timeout(estimated_event_blocks)
.saturating_add(scale_duration(BATCH_TIMEOUT_PER_TX, args.len()))
}
PreparedBatch::SendReply(args) => {
let estimated_event_blocks = blocks_window(args.len(), 1, 6);
event_window_timeout(estimated_event_blocks)
.saturating_add(scale_duration(BATCH_TIMEOUT_PER_TX, args.len()))
}
PreparedBatch::ClaimValue(args) => scale_duration(BATCH_TIMEOUT_PER_TX, args.len()),
};

dynamic_timeout.max(MIN_BATCH_TIMEOUT)
}

/// Parses `Router.commitBatch` transactions for the given block and extracts
/// mailbox, exit, and reply outcome information relevant to the tracked batch.
#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -1603,10 +1658,14 @@ async fn process_events(
#[cfg(test)]
mod tests {
use super::{
Event, TransitionedMessage, apply_mirror_event_update, apply_router_transition_update,
schedule_initial_workers, send_message_wait_window,
Event, MIN_BATCH_TIMEOUT, TransitionedMessage, apply_mirror_event_update,
apply_router_transition_update, batch_watchdog_timeout, schedule_initial_workers,
send_message_wait_window,
};
use crate::batch::{
context::ContextUpdate,
value::{PreparedBatch, PreparedSendMessage},
};
use crate::batch::context::ContextUpdate;
use ethexe_common::events::{
MirrorEvent,
mirror::{
Expand All @@ -1615,6 +1674,7 @@ mod tests {
ValueClaimedEvent, ValueClaimingRequestedEvent,
},
};
use gear_call_gen::SendMessageArgs;
use gear_core::{ids::prelude::MessageIdExt, message::ReplyCode};
use gprimitives::{ActorId, H256, MessageId};

Expand Down Expand Up @@ -1783,6 +1843,26 @@ mod tests {
assert_eq!(send_message_wait_window(3, false), 18);
}

fn send_message_batch(len: usize) -> PreparedBatch {
PreparedBatch::SendMessage(vec![
PreparedSendMessage {
arg: SendMessageArgs((actor(1), vec![1, 2, 3], 1_000, 0)),
use_injected: false,
};
len
])
}

#[test]
fn batch_watchdog_timeout_scales_with_event_window() {
let batch = send_message_batch(6);
let multicall_timeout = batch_watchdog_timeout(&batch, true);
let direct_timeout = batch_watchdog_timeout(&batch, false);

assert!(multicall_timeout >= MIN_BATCH_TIMEOUT);
assert!(direct_timeout > multicall_timeout);
}

#[test]
fn first_exhausting_batch_stops_initial_scheduling_loop() {
let mut scheduled_workers = Vec::new();
Expand Down
108 changes: 90 additions & 18 deletions ethexe/node-loader/src/batch/generator/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ use gear_wasm_gen::{StandardGearWasmConfigsBundle, SyscallKind};
use std::iter;
use tracing::instrument;

const UPLOAD_PROGRAM_BATCH_ID: u8 = 0;
const UPLOAD_CODE_BATCH_ID: u8 = 1;
const SEND_MESSAGE_BATCH_ID: u8 = 2;
const CREATE_PROGRAM_BATCH_ID: u8 = 3;
const SEND_REPLY_BATCH_ID: u8 = 4;
const CLAIM_VALUE_BATCH_ID: u8 = 5;

/// Runtime values that need to stay in sync with the target `ethexe` network.
#[derive(Clone, Copy)]
pub struct RuntimeSettings {
Expand Down Expand Up @@ -148,16 +155,16 @@ impl<Rng: CallGenRng> BatchGenerator<Rng> {

fn select_program_creation_batch_id(&mut self, context: &Context) -> u8 {
if context.all_code_ids().is_empty() || self.batch_gen_rng.gen_bool(0.5) {
0
UPLOAD_PROGRAM_BATCH_ID
} else {
3
CREATE_PROGRAM_BATCH_ID
}
}

fn select_non_creation_batch_id(&mut self, context: &Context) -> u8 {
let mut viable = vec![1, 2];
let mut viable = vec![SEND_MESSAGE_BATCH_ID];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In select_non_creation_batch_id, UPLOAD_CODE_BATCH_ID was removed, which seems correct as it's not part of steady-state traffic. However, it was not added to select_program_creation_batch_id. This makes UPLOAD_CODE_BATCH_ID unreachable from the main batch generation logic in select_batch_id, effectively disabling standalone upload_code batches.

If this is not intended, select_program_creation_batch_id should be updated to include UPLOAD_CODE_BATCH_ID as a possible outcome. For example:

fn select_program_creation_batch_id(&mut self, context: &Context) -> u8 {
    if context.all_code_ids().is_empty() {
        // Must upload a program to get a code ID.
        return UPLOAD_PROGRAM_BATCH_ID;
    }

    // A possible distribution, please adjust to desired workload.
    match self.batch_gen_rng.gen_range(0..3) {
        0 => UPLOAD_PROGRAM_BATCH_ID,
        1 => CREATE_PROGRAM_BATCH_ID,
        _ => UPLOAD_CODE_BATCH_ID,
    }
}

if !context.all_mailbox_message_ids().is_empty() {
viable.extend([4, 5]);
viable.extend([SEND_REPLY_BATCH_ID, CLAIM_VALUE_BATCH_ID]);
}

let idx = self.batch_gen_rng.gen_range(0..viable.len());
Expand Down Expand Up @@ -188,43 +195,45 @@ impl<Rng: CallGenRng> BatchGenerator<Rng> {
rt_settings: RuntimeSettings,
) -> Batch {
match batch_id {
0 => self.generate_upload_program_batch(context, seed, rt_settings.gas_limit),
1 => self.generate_upload_code_batch(context, seed),
2 => match NonEmpty::from_vec(context.active_program_ids()) {
UPLOAD_PROGRAM_BATCH_ID => {
self.generate_upload_program_batch(context, seed, rt_settings.gas_limit)
}
UPLOAD_CODE_BATCH_ID => self.generate_upload_code_batch(context, seed),
SEND_MESSAGE_BATCH_ID => match NonEmpty::from_vec(context.active_program_ids()) {
Some(existing_programs) => Self::gen_batch::<SendMessageArgs, _, _>(
self.batch_size,
seed,
|rng| (existing_programs.clone(), rng.next_u64()),
|| (rt_settings.gas_limit,),
),
None => self.generate_batch(0, context, seed, rt_settings),
None => self.generate_batch(UPLOAD_PROGRAM_BATCH_ID, context, seed, rt_settings),
},
3 => match NonEmpty::from_vec(context.all_code_ids()) {
CREATE_PROGRAM_BATCH_ID => match NonEmpty::from_vec(context.all_code_ids()) {
Some(existing_codes) => Self::gen_batch::<CreateProgramArgs, _, _>(
self.batch_size,
seed,
|rng| (existing_codes.clone(), rng.next_u64()),
|| (rt_settings.gas_limit,),
),
None => self.generate_batch(0, context, seed, rt_settings),
None => self.generate_batch(UPLOAD_PROGRAM_BATCH_ID, context, seed, rt_settings),
},
4 => match NonEmpty::from_vec(context.all_mailbox_message_ids()) {
SEND_REPLY_BATCH_ID => match NonEmpty::from_vec(context.all_mailbox_message_ids()) {
Some(mailbox_messages) => Self::gen_batch::<SendReplyArgs, _, _>(
self.batch_size,
seed,
|rng| (mailbox_messages.clone(), rng.next_u64()),
|| (rt_settings.gas_limit,),
),
None => self.generate_batch(0, context, seed, rt_settings),
None => self.generate_batch(UPLOAD_PROGRAM_BATCH_ID, context, seed, rt_settings),
},
5 => match NonEmpty::from_vec(context.all_mailbox_message_ids()) {
CLAIM_VALUE_BATCH_ID => match NonEmpty::from_vec(context.all_mailbox_message_ids()) {
Some(mailbox_messages) => Self::gen_batch::<ClaimValueArgs, _, _>(
self.batch_size,
seed,
|rng| (mailbox_messages.clone(), rng.next_u64()),
|| (),
),
None => self.generate_batch(0, context, seed, rt_settings),
None => self.generate_batch(UPLOAD_PROGRAM_BATCH_ID, context, seed, rt_settings),
},
_ => unreachable!(),
}
Expand Down Expand Up @@ -350,7 +359,10 @@ impl<Rng: CallGenRng> BatchGenerator<Rng> {

#[cfg(test)]
mod tests {
use super::{Batch, BatchGenerator, RuntimeSettings};
use super::{
Batch, BatchGenerator, CLAIM_VALUE_BATCH_ID, CREATE_PROGRAM_BATCH_ID, RuntimeSettings,
SEND_MESSAGE_BATCH_ID, SEND_REPLY_BATCH_ID, UPLOAD_CODE_BATCH_ID, UPLOAD_PROGRAM_BATCH_ID,
};
use crate::{
args::SeedVariant,
batch::{
Expand Down Expand Up @@ -416,7 +428,7 @@ mod tests {
);

match generator.generate_batch(
0,
UPLOAD_PROGRAM_BATCH_ID,
context_with_programs(),
12,
RuntimeSettings { gas_limit: 123 },
Expand All @@ -429,7 +441,7 @@ mod tests {
}

match generator.generate_batch(
1,
UPLOAD_CODE_BATCH_ID,
context_with_programs(),
13,
RuntimeSettings { gas_limit: 123 },
Expand All @@ -453,7 +465,7 @@ mod tests {
);

match generator.generate_batch(
4,
SEND_REPLY_BATCH_ID,
context_with_programs(),
23,
RuntimeSettings { gas_limit: 321 },
Expand All @@ -462,4 +474,64 @@ mod tests {
other => panic!("unexpected batch: {other:?}"),
}
}

#[test]
fn zero_program_creation_ratio_selects_only_traffic_batches_after_bootstrap() {
let mut generator = BatchGenerator::<SmallRng>::new(
31,
1,
Some(SeedVariant::Constant(32)),
RuntimeSettings { gas_limit: 321 },
WorkloadPolicy {
program_creation_ratio: 0,
},
);
let context = context_with_programs();

for _ in 0..128 {
match generator.select_batch_id(&context) {
SEND_MESSAGE_BATCH_ID | SEND_REPLY_BATCH_ID | CLAIM_VALUE_BATCH_ID => {}
other => panic!("unexpected creation batch id selected: {other}"),
}
}
}

#[test]
fn full_program_creation_ratio_selects_only_program_creation_batches() {
let mut generator = BatchGenerator::<SmallRng>::new(
41,
1,
Some(SeedVariant::Constant(42)),
RuntimeSettings { gas_limit: 321 },
WorkloadPolicy {
program_creation_ratio: 100,
},
);
let context = context_with_programs();

for _ in 0..128 {
match generator.select_batch_id(&context) {
UPLOAD_PROGRAM_BATCH_ID | CREATE_PROGRAM_BATCH_ID => {}
other => panic!("unexpected non-creation batch id selected: {other}"),
}
}
}

#[test]
fn zero_program_creation_ratio_still_bootstraps_empty_context() {
let mut generator = BatchGenerator::<SmallRng>::new(
51,
1,
Some(SeedVariant::Constant(52)),
RuntimeSettings { gas_limit: 321 },
WorkloadPolicy {
program_creation_ratio: 0,
},
);

assert_eq!(
generator.select_batch_id(&Context::new()),
UPLOAD_PROGRAM_BATCH_ID
);
}
}
Loading
Loading