diff --git a/crates/full-node/full-node-configs/src/runner.rs b/crates/full-node/full-node-configs/src/runner.rs index aa6f8cc8c5..70224da59b 100644 --- a/crates/full-node/full-node-configs/src/runner.rs +++ b/crates/full-node/full-node-configs/src/runner.rs @@ -133,6 +133,16 @@ pub struct ProofManagerConfig
{ /// When false, submission is deferred until the batch is full, then submitted concurrently. #[serde(default = "default_eager_proof_submission")] pub eager_proof_submission: bool, + /// Override for the number of prover threads. When `None`, defaults to + /// `2 * aggregated_proof_block_jump + 1`, which covers inner proofs for + /// the current and next batch plus one outer-aggregation worker. + #[serde(default)] + pub prover_thread_count_override: Option>, + /// Maximum number of completed aggregation windows that can be buffered in + /// memory between the intake task and the aggregator task. When the + /// aggregator falls behind by this many windows, intake stalls and + /// back-pressure propagates upstream. + pub max_number_of_aggregated_proofs_in_memory: NonZero, } fn default_eager_proof_submission() -> bool { @@ -140,11 +150,11 @@ fn default_eager_proof_submission() -> bool { } impl
ProofManagerConfig
{ - /// Number of prover threads required to fully pipeline aggregation: - /// `2 * aggregated_proof_block_jump + 1` covers inner proofs for the - /// current and next batch plus one outer-aggregation worker. + /// Number of prover threads. pub fn prover_thread_count(&self) -> usize { - 2 * self.aggregated_proof_block_jump.get() + 1 + self.prover_thread_count_override + .map(|n| n.get()) + .unwrap_or_else(|| 2 * self.aggregated_proof_block_jump.get() + 1) } } @@ -225,8 +235,9 @@ mod tests { [proof_manager] aggregated_proof_block_jump = 22 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" - max_number_of_transitions_in_db = 1025 - max_number_of_transitions_in_memory = 768 + max_number_of_transitions_in_db = 1000 + max_number_of_transitions_in_memory = 100 + max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 60 max_batch_size_bytes = 1048576 @@ -268,8 +279,9 @@ mod tests { [proof_manager] aggregated_proof_block_jump = 22 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" - max_number_of_transitions_in_db = 1025 - max_number_of_transitions_in_memory = 768 + max_number_of_transitions_in_db = 1000 + max_number_of_transitions_in_memory = 100 + max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 60 max_batch_size_bytes = 1048576 @@ -321,8 +333,9 @@ mod tests { [proof_manager] aggregated_proof_block_jump = 22 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" - max_number_of_transitions_in_db = 1025 - max_number_of_transitions_in_memory = 768 + max_number_of_transitions_in_db = 1000 + max_number_of_transitions_in_memory = 100 + max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 60 max_batch_size_bytes = 1048576 diff --git a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap index 3d889a2b81..ba43cf609f 100644 --- a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap +++ b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap @@ -50,9 +50,11 @@ expression: config "proof_manager": { "aggregated_proof_block_jump": 22, "prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", - "max_number_of_transitions_in_db": 1025, - "max_number_of_transitions_in_memory": 768, - "eager_proof_submission": true + "max_number_of_transitions_in_db": 1000, + "max_number_of_transitions_in_memory": 100, + "eager_proof_submission": true, + "prover_thread_count_override": null, + "max_number_of_aggregated_proofs_in_memory": 5 }, "sequencer": { "automatic_batch_production": true, diff --git a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap index 2db485d625..ccacd5c3a1 100644 --- a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap +++ b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap @@ -50,9 +50,11 @@ expression: config "proof_manager": { "aggregated_proof_block_jump": 22, "prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", - "max_number_of_transitions_in_db": 1025, - "max_number_of_transitions_in_memory": 768, - "eager_proof_submission": true + "max_number_of_transitions_in_db": 1000, + "max_number_of_transitions_in_memory": 100, + "eager_proof_submission": true, + "prover_thread_count_override": null, + "max_number_of_aggregated_proofs_in_memory": 5 }, "sequencer": { "automatic_batch_production": true, diff --git a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap index 7297192d4d..bb2ff9015d 100644 --- a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap +++ b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap @@ -50,9 +50,11 @@ expression: config "proof_manager": { "aggregated_proof_block_jump": 22, "prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", - "max_number_of_transitions_in_db": 1025, - "max_number_of_transitions_in_memory": 768, - "eager_proof_submission": true + "max_number_of_transitions_in_db": 1000, + "max_number_of_transitions_in_memory": 100, + "eager_proof_submission": true, + "prover_thread_count_override": null, + "max_number_of_aggregated_proofs_in_memory": 5 }, "sequencer": { "automatic_batch_production": true, diff --git a/crates/full-node/sov-stf-runner/src/processes/metrics.rs b/crates/full-node/sov-stf-runner/src/processes/metrics.rs index 252e8e3c3f..1134717f0f 100644 --- a/crates/full-node/sov-stf-runner/src/processes/metrics.rs +++ b/crates/full-node/sov-stf-runner/src/processes/metrics.rs @@ -39,6 +39,9 @@ pub(crate) struct ZkProofManagerMetrics { pub proofs_to_create: usize, /// The slot number of the most recently received state transition. pub slot_number: u64, + /// Number of pending aggregated-proof metadata items currently buffered in the + /// intake-to-aggregator channel (capacity `max_number_of_aggregated_proofs_in_memory`). + pub pending_agg_metadata: usize, } impl Metric for ZkProofManagerMetrics { @@ -49,11 +52,12 @@ impl Metric for ZkProofManagerMetrics { fn serialize_for_telegraf(&self, buffer: &mut Vec) -> std::io::Result<()> { write!( buffer, - "{} proving_lag={}i,proofs_to_create={}i,slot_number={}i", + "{} proving_lag={}i,proofs_to_create={}i,slot_number={}i,pending_agg_metadata={}i", self.measurement_name(), self.proving_lag, self.proofs_to_create, self.slot_number, + self.pending_agg_metadata, ) } } diff --git a/crates/full-node/sov-stf-runner/src/processes/mod.rs b/crates/full-node/sov-stf-runner/src/processes/mod.rs index 06a29a1aad..1ad79a39f2 100644 --- a/crates/full-node/sov-stf-runner/src/processes/mod.rs +++ b/crates/full-node/sov-stf-runner/src/processes/mod.rs @@ -17,10 +17,12 @@ use tokio::task::JoinHandle; pub use zk_manager::*; /// Starts a process that generates aggregated proofs in the background. +#[allow(clippy::too_many_arguments)] pub async fn start_zk_workflow_in_background( prover_service: Ps, aggregated_proof_block_jump: NonZero, eager_proof_submission: bool, + max_number_of_aggregated_proofs_in_memory: NonZero, proof_sender: Box, genesis_state_root: Ps::StateRoot, stf_info_receiver: Receiver::Spec>, @@ -34,6 +36,7 @@ where prover_service, aggregated_proof_block_jump, eager_proof_submission, + max_number_of_aggregated_proofs_in_memory, proof_sender, genesis_state_root, stf_info_receiver, diff --git a/crates/full-node/sov-stf-runner/src/processes/zk_manager/mod.rs b/crates/full-node/sov-stf-runner/src/processes/zk_manager/mod.rs index 1baf677e43..4ed99ae692 100644 --- a/crates/full-node/sov-stf-runner/src/processes/zk_manager/mod.rs +++ b/crates/full-node/sov-stf-runner/src/processes/zk_manager/mod.rs @@ -29,6 +29,7 @@ pub struct ZkProofManager { proofs_to_create: UnAggregatedProofList, aggregated_proof_block_jump: NonZero, eager_proof_submission: bool, + max_number_of_aggregated_proofs_in_memory: NonZero, proof_sender: Box, backoff_policy: ExponentialBuilder, genesis_state_root: Ps::StateRoot, @@ -46,6 +47,7 @@ where prover_service: Ps, aggregated_proof_block_jump: NonZero, eager_proof_submission: bool, + max_number_of_aggregated_proofs_in_memory: NonZero, proof_sender: Box, genesis_state_root: Ps::StateRoot, stf_info_receiver: Receiver::Spec>, @@ -56,6 +58,7 @@ where proofs_to_create: UnAggregatedProofList::new(), aggregated_proof_block_jump, eager_proof_submission, + max_number_of_aggregated_proofs_in_memory, proof_sender, backoff_policy: ExponentialBuilder::default() .with_min_delay(Duration::from_secs(BACKOFF_POLICY_MIN_DELAY)) @@ -75,7 +78,9 @@ where tokio::spawn(async move { tracing::info!("Spawning an aggregated proof posting background task"); - let (metadata_tx, metadata_rx) = mpsc::channel::<(AggregateProofMetadata, u64)>(1); + let (metadata_tx, metadata_rx) = mpsc::channel::<(AggregateProofMetadata, u64)>( + self.max_number_of_aggregated_proofs_in_memory.get(), + ); // Cursor handle goes to the aggregator so the cursor only // advances after publish succeeds. See module-level docs. @@ -211,11 +216,11 @@ where let metadata = self.proofs_to_create.take_oldest(); let window_size = num_proofs_to_create as u64; - // 1-deep channel: blocks here only when the aggregator already - // has one window in flight AND a second buffered. While blocked, - // intake stops draining the STF-info mpsc, which propagates - // back-pressure upstream to the runner exactly as in the - // pre-pipelining design. + // Capacity is `max_number_of_aggregated_proofs_in_memory`: blocks + // here only once the aggregator has one window in flight AND that + // many windows buffered. While blocked, intake stops draining the + // STF-info mpsc, which propagates back-pressure upstream to the + // runner exactly as in the pre-pipelining design. self.metadata_tx .send((metadata, window_size)) .await @@ -224,11 +229,13 @@ where })?; } + let pending_agg_metadata = self.metadata_tx.max_capacity() - self.metadata_tx.capacity(); sov_metrics::track_metrics(|tracker| { tracker.submit(super::metrics::ZkProofManagerMetrics { proving_lag: received_slot_number.get() - first_height_unproven.get(), proofs_to_create: self.proofs_to_create.current_proof_jump(), slot_number: received_slot_number.get(), + pending_agg_metadata, }); }); @@ -236,9 +243,6 @@ where } } -/// Aggregator side of [`ZkProofManager`]. Pulls completed windows from the -/// 1-deep mpsc, runs the (necessarily serial) recursive aggregation, and -/// publishes the resulting blob to DA. struct AggregatorTask { prover_service: Arc, proof_sender: Box, diff --git a/crates/full-node/sov-stf-runner/src/runner.rs b/crates/full-node/sov-stf-runner/src/runner.rs index 4c4d21a83a..59da4a4739 100644 --- a/crates/full-node/sov-stf-runner/src/runner.rs +++ b/crates/full-node/sov-stf-runner/src/runner.rs @@ -41,8 +41,16 @@ fn validate_proof_manager_config
( ) -> anyhow::Result<()> { let aggregated_proof_block_jump = u64::try_from(config.aggregated_proof_block_jump.get()) .context("aggregated_proof_block_jump does not fit in u64")?; + let buffered_windows = u64::try_from(config.max_number_of_aggregated_proofs_in_memory.get()) + .context("max_number_of_aggregated_proofs_in_memory does not fit in u64")?; + // Windows resident across the pipeline: the one intake is filling, the one + // the aggregator is working on, plus `buffered_windows` queued in the + // intake→aggregator channel. + let pipelined_windows = buffered_windows + .checked_add(2) + .context("aggregated proof window count overflowed")?; let pipelined_backlog = aggregated_proof_block_jump - .checked_mul(3) + .checked_mul(pipelined_windows) .context("aggregated proof backlog overflowed")?; let required_transitions_in_db = config .max_number_of_transitions_in_memory @@ -52,10 +60,11 @@ fn validate_proof_manager_config
( anyhow::ensure!( config.max_number_of_transitions_in_db.get() >= required_transitions_in_db, - "Invalid proof manager config: `max_number_of_transitions_in_db` must be at least `max_number_of_transitions_in_memory + 3 * aggregated_proof_block_jump` for pipelined aggregated proof posting (got db={}, memory={}, jump={}, required={})", + "Invalid proof manager config: `max_number_of_transitions_in_db` must be at least `max_number_of_transitions_in_memory + (max_number_of_aggregated_proofs_in_memory + 2) * aggregated_proof_block_jump` for pipelined aggregated proof posting (got db={}, memory={}, jump={}, buffered={}, required={})", config.max_number_of_transitions_in_db, config.max_number_of_transitions_in_memory, config.aggregated_proof_block_jump, + config.max_number_of_aggregated_proofs_in_memory, required_transitions_in_db, ); diff --git a/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs b/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs index bf1c3b152b..7b5492551f 100644 --- a/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs +++ b/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs @@ -288,6 +288,9 @@ pub async fn initialize_runner( prover_service, rollup_config.proof_manager.aggregated_proof_block_jump, rollup_config.proof_manager.eager_proof_submission, + rollup_config + .proof_manager + .max_number_of_aggregated_proofs_in_memory, Box::new(MockProofSender { da: da_service.clone(), }), @@ -414,9 +417,11 @@ pub fn rollup_config_with_da>( proof_manager: ProofManagerConfig { aggregated_proof_block_jump: NonZero::new(aggregated_proof_block_jump).unwrap(), prover_address: MockAddress::new([0u8; 32]), - max_number_of_transitions_in_db: NonZero::new(30).unwrap(), - max_number_of_transitions_in_memory: NonZero::new(20).unwrap(), + max_number_of_transitions_in_db: NonZero::new(1000).unwrap(), + max_number_of_transitions_in_memory: NonZero::new(100).unwrap(), eager_proof_submission: true, + prover_thread_count_override: None, + max_number_of_aggregated_proofs_in_memory: NonZero::new(5).unwrap(), }, sequencer: SequencerConfig { automatic_batch_production: true, diff --git a/crates/module-system/module-schemas/rollup-config.json b/crates/module-system/module-schemas/rollup-config.json index 0de09c3a36..9ad2c16763 100644 --- a/crates/module-system/module-schemas/rollup-config.json +++ b/crates/module-system/module-schemas/rollup-config.json @@ -644,6 +644,7 @@ "type": "object", "required": [ "aggregated_proof_block_jump", + "max_number_of_aggregated_proofs_in_memory", "max_number_of_transitions_in_db", "max_number_of_transitions_in_memory", "prover_address" @@ -660,6 +661,12 @@ "default": true, "type": "boolean" }, + "max_number_of_aggregated_proofs_in_memory": { + "description": "Maximum number of completed aggregation windows that can be buffered in memory between the intake task and the aggregator task. When the aggregator falls behind by this many windows, intake stalls and back-pressure propagates upstream.", + "type": "integer", + "format": "uint", + "minimum": 1.0 + }, "max_number_of_transitions_in_db": { "description": "A number of state transition info entries are allowed to be stored in the database. When the number is exceeded, older entries are removed.", "type": "integer", @@ -679,6 +686,16 @@ "$ref": "#/definitions/Address" } ] + }, + "prover_thread_count_override": { + "description": "Override for the number of prover threads. When `None`, defaults to `2 * aggregated_proof_block_jump + 1`, which covers inner proofs for the current and next batch plus one outer-aggregation worker.", + "default": null, + "type": [ + "integer", + "null" + ], + "format": "uint", + "minimum": 1.0 } } }, diff --git a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs index de690cc909..efb757a1ce 100644 --- a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs +++ b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs @@ -579,6 +579,9 @@ pub trait FullNodeBlueprint: RollupBlueprint { prover_service, rollup_config.proof_manager.aggregated_proof_block_jump, rollup_config.proof_manager.eager_proof_submission, + rollup_config + .proof_manager + .max_number_of_aggregated_proofs_in_memory, proof_sender, genesis_state_root, stf_info_receiver, diff --git a/crates/utils/sov-test-utils/src/test_rollup.rs b/crates/utils/sov-test-utils/src/test_rollup.rs index 353d7e6402..223a9e07e9 100644 --- a/crates/utils/sov-test-utils/src/test_rollup.rs +++ b/crates/utils/sov-test-utils/src/test_rollup.rs @@ -370,6 +370,8 @@ impl + Default + 'static> RollupBuilder { max_number_of_transitions_in_memory: NonZero::new(self.config.max_channel_size) .unwrap(), eager_proof_submission: true, + prover_thread_count_override: None, + max_number_of_aggregated_proofs_in_memory: NonZero::new(5).unwrap(), }, sequencer: SequencerConfig { automatic_batch_production: self.config.automatic_batch_production, diff --git a/examples/demo-rollup/configs/celestia_rollup_config.toml b/examples/demo-rollup/configs/celestia_rollup_config.toml index 75080788ac..ca81861b9c 100644 --- a/examples/demo-rollup/configs/celestia_rollup_config.toml +++ b/examples/demo-rollup/configs/celestia_rollup_config.toml @@ -131,8 +131,9 @@ telegraf_address = "udp://127.0.0.1:8094" [proof_manager] aggregated_proof_block_jump = 10 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" -max_number_of_transitions_in_db = 100 -max_number_of_transitions_in_memory = 30 +max_number_of_transitions_in_db = 1000 +max_number_of_transitions_in_memory = 100 +max_number_of_aggregated_proofs_in_memory = 5 [sequencer] # This value is counted between sequencer receiving receipt from DA about inclusion till node has processed this blob. diff --git a/examples/demo-rollup/configs/external_mock_rollup_config.toml b/examples/demo-rollup/configs/external_mock_rollup_config.toml index 6c32206203..12fc2bc849 100644 --- a/examples/demo-rollup/configs/external_mock_rollup_config.toml +++ b/examples/demo-rollup/configs/external_mock_rollup_config.toml @@ -53,8 +53,9 @@ telegraf_address = "udp://127.0.0.1:8094" [proof_manager] aggregated_proof_block_jump = 1 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" -max_number_of_transitions_in_db = 100 -max_number_of_transitions_in_memory = 30 +max_number_of_transitions_in_db = 1000 +max_number_of_transitions_in_memory = 100 +max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 3000 diff --git a/examples/demo-rollup/configs/mock_rollup_config.toml b/examples/demo-rollup/configs/mock_rollup_config.toml index 3aaba626dd..3e9173fef5 100644 --- a/examples/demo-rollup/configs/mock_rollup_config.toml +++ b/examples/demo-rollup/configs/mock_rollup_config.toml @@ -80,8 +80,9 @@ tokio_runtime_metrics_interval_millis = 500 [proof_manager] aggregated_proof_block_jump = 1 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" -max_number_of_transitions_in_db = 100 -max_number_of_transitions_in_memory = 30 +max_number_of_transitions_in_db = 1000 +max_number_of_transitions_in_memory = 100 +max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 3000 diff --git a/examples/demo-rollup/configs/replica_external_mock_rollup_config.toml b/examples/demo-rollup/configs/replica_external_mock_rollup_config.toml index c3756484cc..b4b17eae70 100644 --- a/examples/demo-rollup/configs/replica_external_mock_rollup_config.toml +++ b/examples/demo-rollup/configs/replica_external_mock_rollup_config.toml @@ -53,8 +53,9 @@ telegraf_address = "udp://127.0.0.1:8095" [proof_manager] aggregated_proof_block_jump = 1 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" -max_number_of_transitions_in_db = 100 -max_number_of_transitions_in_memory = 30 +max_number_of_transitions_in_db = 1000 +max_number_of_transitions_in_memory = 100 +max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 3000 diff --git a/examples/demo-rollup/hive/mock_rollup_config.toml b/examples/demo-rollup/hive/mock_rollup_config.toml index 183e72b392..37a4e0a8b8 100644 --- a/examples/demo-rollup/hive/mock_rollup_config.toml +++ b/examples/demo-rollup/hive/mock_rollup_config.toml @@ -32,8 +32,9 @@ tokio_runtime_metrics_interval_millis = 500 [proof_manager] aggregated_proof_block_jump = 1 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" -max_number_of_transitions_in_db = 100 -max_number_of_transitions_in_memory = 30 +max_number_of_transitions_in_db = 1000 +max_number_of_transitions_in_memory = 100 +max_number_of_aggregated_proofs_in_memory = 5 [sequencer] blob_processing_timeout_secs = 3000 diff --git a/typescript/examples/soak-testing/templates/rollup.toml b/typescript/examples/soak-testing/templates/rollup.toml index f2d9399001..090dc54e74 100644 --- a/typescript/examples/soak-testing/templates/rollup.toml +++ b/typescript/examples/soak-testing/templates/rollup.toml @@ -22,8 +22,9 @@ telegraf_address = "127.0.0.1:8094" [proof_manager] aggregated_proof_block_jump = 1 prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf" -max_number_of_transitions_in_db = 100 -max_number_of_transitions_in_memory = 20 +max_number_of_transitions_in_db = 1000 +max_number_of_transitions_in_memory = 100 +max_number_of_aggregated_proofs_in_memory = 5 [sequencer]