Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions crates/full-node/full-node-configs/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,28 @@ pub struct ProofManagerConfig<Address> {
/// When false, submission is deferred until the batch is full, then submitted concurrently.
#[serde(default = "default_eager_proof_submission")]
pub eager_proof_submission: bool,
/// Override for the number of prover threads. When `None`, defaults to
/// `2 * aggregated_proof_block_jump + 1`, which covers inner proofs for
/// the current and next batch plus one outer-aggregation worker.
#[serde(default)]
pub prover_thread_count_override: Option<NonZero<usize>>,
Comment thread
bkolad marked this conversation as resolved.
/// Maximum number of completed aggregation windows that can be buffered in
/// memory between the intake task and the aggregator task. When the
/// aggregator falls behind by this many windows, intake stalls and
/// back-pressure propagates upstream.
pub max_number_of_aggregated_proofs_in_memory: NonZero<usize>,
Comment thread
bkolad marked this conversation as resolved.
}

fn default_eager_proof_submission() -> bool {
true
}

impl<Address> ProofManagerConfig<Address> {
/// Number of prover threads required to fully pipeline aggregation:
/// `2 * aggregated_proof_block_jump + 1` covers inner proofs for the
/// current and next batch plus one outer-aggregation worker.
/// Number of prover threads.
pub fn prover_thread_count(&self) -> usize {
2 * self.aggregated_proof_block_jump.get() + 1
self.prover_thread_count_override
.map(|n| n.get())
.unwrap_or_else(|| 2 * self.aggregated_proof_block_jump.get() + 1)
}
}

Expand Down Expand Up @@ -225,8 +235,9 @@ mod tests {
[proof_manager]
aggregated_proof_block_jump = 22
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 1025
max_number_of_transitions_in_memory = 768
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5
[sequencer]
blob_processing_timeout_secs = 60
max_batch_size_bytes = 1048576
Expand Down Expand Up @@ -268,8 +279,9 @@ mod tests {
[proof_manager]
aggregated_proof_block_jump = 22
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 1025
max_number_of_transitions_in_memory = 768
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5
[sequencer]
blob_processing_timeout_secs = 60
max_batch_size_bytes = 1048576
Expand Down Expand Up @@ -321,8 +333,9 @@ mod tests {
[proof_manager]
aggregated_proof_block_jump = 22
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 1025
max_number_of_transitions_in_memory = 768
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5
[sequencer]
blob_processing_timeout_secs = 60
max_batch_size_bytes = 1048576
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ expression: config
"proof_manager": {
"aggregated_proof_block_jump": 22,
"prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf",
"max_number_of_transitions_in_db": 1025,
"max_number_of_transitions_in_memory": 768,
"eager_proof_submission": true
"max_number_of_transitions_in_db": 1000,
"max_number_of_transitions_in_memory": 100,
"eager_proof_submission": true,
"prover_thread_count_override": null,
"max_number_of_aggregated_proofs_in_memory": 5
},
"sequencer": {
"automatic_batch_production": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ expression: config
"proof_manager": {
"aggregated_proof_block_jump": 22,
"prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf",
"max_number_of_transitions_in_db": 1025,
"max_number_of_transitions_in_memory": 768,
"eager_proof_submission": true
"max_number_of_transitions_in_db": 1000,
"max_number_of_transitions_in_memory": 100,
"eager_proof_submission": true,
"prover_thread_count_override": null,
"max_number_of_aggregated_proofs_in_memory": 5
},
"sequencer": {
"automatic_batch_production": true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ expression: config
"proof_manager": {
"aggregated_proof_block_jump": 22,
"prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf",
"max_number_of_transitions_in_db": 1025,
"max_number_of_transitions_in_memory": 768,
"eager_proof_submission": true
"max_number_of_transitions_in_db": 1000,
"max_number_of_transitions_in_memory": 100,
"eager_proof_submission": true,
"prover_thread_count_override": null,
"max_number_of_aggregated_proofs_in_memory": 5
},
"sequencer": {
"automatic_batch_production": true,
Expand Down
6 changes: 5 additions & 1 deletion crates/full-node/sov-stf-runner/src/processes/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub(crate) struct ZkProofManagerMetrics {
pub proofs_to_create: usize,
/// The slot number of the most recently received state transition.
pub slot_number: u64,
/// Number of pending aggregated-proof metadata items currently buffered in the
/// intake-to-aggregator channel (capacity `max_number_of_aggregated_proofs_in_memory`).
pub pending_agg_metadata: usize,
}

impl Metric for ZkProofManagerMetrics {
Expand All @@ -49,11 +52,12 @@ impl Metric for ZkProofManagerMetrics {
fn serialize_for_telegraf(&self, buffer: &mut Vec<u8>) -> std::io::Result<()> {
write!(
buffer,
"{} proving_lag={}i,proofs_to_create={}i,slot_number={}i",
"{} proving_lag={}i,proofs_to_create={}i,slot_number={}i,pending_agg_metadata={}i",
self.measurement_name(),
self.proving_lag,
self.proofs_to_create,
self.slot_number,
self.pending_agg_metadata,
)
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/full-node/sov-stf-runner/src/processes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use tokio::task::JoinHandle;
pub use zk_manager::*;

/// Starts a process that generates aggregated proofs in the background.
#[allow(clippy::too_many_arguments)]
pub async fn start_zk_workflow_in_background<Ps>(
prover_service: Ps,
aggregated_proof_block_jump: NonZero<usize>,
eager_proof_submission: bool,
max_number_of_aggregated_proofs_in_memory: NonZero<usize>,
proof_sender: Box<dyn ProofSender>,
genesis_state_root: Ps::StateRoot,
stf_info_receiver: Receiver<Ps::StateRoot, Ps::Witness, <Ps::DaService as DaService>::Spec>,
Expand All @@ -34,6 +36,7 @@ where
prover_service,
aggregated_proof_block_jump,
eager_proof_submission,
max_number_of_aggregated_proofs_in_memory,
proof_sender,
genesis_state_root,
stf_info_receiver,
Expand Down
22 changes: 13 additions & 9 deletions crates/full-node/sov-stf-runner/src/processes/zk_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct ZkProofManager<Ps: ProverService> {
proofs_to_create: UnAggregatedProofList<Ps>,
aggregated_proof_block_jump: NonZero<usize>,
eager_proof_submission: bool,
max_number_of_aggregated_proofs_in_memory: NonZero<usize>,
proof_sender: Box<dyn ProofSender>,
backoff_policy: ExponentialBuilder,
genesis_state_root: Ps::StateRoot,
Expand All @@ -46,6 +47,7 @@ where
prover_service: Ps,
aggregated_proof_block_jump: NonZero<usize>,
eager_proof_submission: bool,
max_number_of_aggregated_proofs_in_memory: NonZero<usize>,
proof_sender: Box<dyn ProofSender>,
genesis_state_root: Ps::StateRoot,
stf_info_receiver: Receiver<Ps::StateRoot, Ps::Witness, <Ps::DaService as DaService>::Spec>,
Expand All @@ -56,6 +58,7 @@ where
proofs_to_create: UnAggregatedProofList::new(),
aggregated_proof_block_jump,
eager_proof_submission,
max_number_of_aggregated_proofs_in_memory,
proof_sender,
backoff_policy: ExponentialBuilder::default()
.with_min_delay(Duration::from_secs(BACKOFF_POLICY_MIN_DELAY))
Expand All @@ -75,7 +78,9 @@ where
tokio::spawn(async move {
tracing::info!("Spawning an aggregated proof posting background task");

let (metadata_tx, metadata_rx) = mpsc::channel::<(AggregateProofMetadata<Ps>, u64)>(1);
let (metadata_tx, metadata_rx) = mpsc::channel::<(AggregateProofMetadata<Ps>, u64)>(
self.max_number_of_aggregated_proofs_in_memory.get(),
);

// Cursor handle goes to the aggregator so the cursor only
// advances after publish succeeds. See module-level docs.
Expand Down Expand Up @@ -211,11 +216,11 @@ where
let metadata = self.proofs_to_create.take_oldest();
let window_size = num_proofs_to_create as u64;

// 1-deep channel: blocks here only when the aggregator already
// has one window in flight AND a second buffered. While blocked,
// intake stops draining the STF-info mpsc, which propagates
// back-pressure upstream to the runner exactly as in the
// pre-pipelining design.
// Capacity is `max_number_of_aggregated_proofs_in_memory`: blocks
// here only once the aggregator has one window in flight AND that
// many windows buffered. While blocked, intake stops draining the
// STF-info mpsc, which propagates back-pressure upstream to the
// runner exactly as in the pre-pipelining design.
self.metadata_tx
.send((metadata, window_size))
.await
Expand All @@ -224,21 +229,20 @@ where
})?;
}

let pending_agg_metadata = self.metadata_tx.max_capacity() - self.metadata_tx.capacity();
sov_metrics::track_metrics(|tracker| {
tracker.submit(super::metrics::ZkProofManagerMetrics {
proving_lag: received_slot_number.get() - first_height_unproven.get(),
proofs_to_create: self.proofs_to_create.current_proof_jump(),
slot_number: received_slot_number.get(),
pending_agg_metadata,
});
});

Ok(())
}
}

/// Aggregator side of [`ZkProofManager`]. Pulls completed windows from the
/// 1-deep mpsc, runs the (necessarily serial) recursive aggregation, and
/// publishes the resulting blob to DA.
struct AggregatorTask<Ps: ProverService> {
prover_service: Arc<Ps>,
proof_sender: Box<dyn ProofSender>,
Expand Down
13 changes: 11 additions & 2 deletions crates/full-node/sov-stf-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,16 @@ fn validate_proof_manager_config<Address>(
) -> anyhow::Result<()> {
let aggregated_proof_block_jump = u64::try_from(config.aggregated_proof_block_jump.get())
.context("aggregated_proof_block_jump does not fit in u64")?;
let buffered_windows = u64::try_from(config.max_number_of_aggregated_proofs_in_memory.get())
.context("max_number_of_aggregated_proofs_in_memory does not fit in u64")?;
// Windows resident across the pipeline: the one intake is filling, the one
// the aggregator is working on, plus `buffered_windows` queued in the
// intake→aggregator channel.
let pipelined_windows = buffered_windows
.checked_add(2)
.context("aggregated proof window count overflowed")?;
let pipelined_backlog = aggregated_proof_block_jump
.checked_mul(3)
.checked_mul(pipelined_windows)
.context("aggregated proof backlog overflowed")?;
let required_transitions_in_db = config
.max_number_of_transitions_in_memory
Expand All @@ -52,10 +60,11 @@ fn validate_proof_manager_config<Address>(

anyhow::ensure!(
config.max_number_of_transitions_in_db.get() >= required_transitions_in_db,
"Invalid proof manager config: `max_number_of_transitions_in_db` must be at least `max_number_of_transitions_in_memory + 3 * aggregated_proof_block_jump` for pipelined aggregated proof posting (got db={}, memory={}, jump={}, required={})",
"Invalid proof manager config: `max_number_of_transitions_in_db` must be at least `max_number_of_transitions_in_memory + (max_number_of_aggregated_proofs_in_memory + 2) * aggregated_proof_block_jump` for pipelined aggregated proof posting (got db={}, memory={}, jump={}, buffered={}, required={})",
config.max_number_of_transitions_in_db,
config.max_number_of_transitions_in_memory,
config.aggregated_proof_block_jump,
config.max_number_of_aggregated_proofs_in_memory,
required_transitions_in_db,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ pub async fn initialize_runner(
prover_service,
rollup_config.proof_manager.aggregated_proof_block_jump,
rollup_config.proof_manager.eager_proof_submission,
rollup_config
.proof_manager
.max_number_of_aggregated_proofs_in_memory,
Box::new(MockProofSender {
da: da_service.clone(),
}),
Expand Down Expand Up @@ -414,9 +417,11 @@ pub fn rollup_config_with_da<Da: DaService<Config = MockDaConfig>>(
proof_manager: ProofManagerConfig {
aggregated_proof_block_jump: NonZero::new(aggregated_proof_block_jump).unwrap(),
prover_address: MockAddress::new([0u8; 32]),
max_number_of_transitions_in_db: NonZero::new(30).unwrap(),
max_number_of_transitions_in_memory: NonZero::new(20).unwrap(),
max_number_of_transitions_in_db: NonZero::new(1000).unwrap(),
max_number_of_transitions_in_memory: NonZero::new(100).unwrap(),
eager_proof_submission: true,
prover_thread_count_override: None,
max_number_of_aggregated_proofs_in_memory: NonZero::new(5).unwrap(),
},
sequencer: SequencerConfig {
automatic_batch_production: true,
Expand Down
17 changes: 17 additions & 0 deletions crates/module-system/module-schemas/rollup-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@
"type": "object",
"required": [
"aggregated_proof_block_jump",
"max_number_of_aggregated_proofs_in_memory",
"max_number_of_transitions_in_db",
"max_number_of_transitions_in_memory",
"prover_address"
Expand All @@ -660,6 +661,12 @@
"default": true,
"type": "boolean"
},
"max_number_of_aggregated_proofs_in_memory": {
"description": "Maximum number of completed aggregation windows that can be buffered in memory between the intake task and the aggregator task. When the aggregator falls behind by this many windows, intake stalls and back-pressure propagates upstream.",
"type": "integer",
"format": "uint",
"minimum": 1.0
},
"max_number_of_transitions_in_db": {
"description": "A number of state transition info entries are allowed to be stored in the database. When the number is exceeded, older entries are removed.",
"type": "integer",
Expand All @@ -679,6 +686,16 @@
"$ref": "#/definitions/Address"
}
]
},
"prover_thread_count_override": {
"description": "Override for the number of prover threads. When `None`, defaults to `2 * aggregated_proof_block_jump + 1`, which covers inner proofs for the current and next batch plus one outer-aggregation worker.",
"default": null,
"type": [
"integer",
"null"
],
"format": "uint",
"minimum": 1.0
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,9 @@ pub trait FullNodeBlueprint<M: ExecutionMode>: RollupBlueprint<M> {
prover_service,
rollup_config.proof_manager.aggregated_proof_block_jump,
rollup_config.proof_manager.eager_proof_submission,
rollup_config
.proof_manager
.max_number_of_aggregated_proofs_in_memory,
proof_sender,
genesis_state_root,
stf_info_receiver,
Expand Down
2 changes: 2 additions & 0 deletions crates/utils/sov-test-utils/src/test_rollup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ impl<R: FullNodeBlueprint<Native> + Default + 'static> RollupBuilder<R> {
max_number_of_transitions_in_memory: NonZero::new(self.config.max_channel_size)
.unwrap(),
eager_proof_submission: true,
prover_thread_count_override: None,
max_number_of_aggregated_proofs_in_memory: NonZero::new(5).unwrap(),
},
sequencer: SequencerConfig {
automatic_batch_production: self.config.automatic_batch_production,
Expand Down
5 changes: 3 additions & 2 deletions examples/demo-rollup/configs/celestia_rollup_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ telegraf_address = "udp://127.0.0.1:8094"
[proof_manager]
aggregated_proof_block_jump = 10
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 100
max_number_of_transitions_in_memory = 30
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5

[sequencer]
# This value is counted between sequencer receiving receipt from DA about inclusion till node has processed this blob.
Expand Down
5 changes: 3 additions & 2 deletions examples/demo-rollup/configs/external_mock_rollup_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ telegraf_address = "udp://127.0.0.1:8094"
[proof_manager]
aggregated_proof_block_jump = 1
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 100
max_number_of_transitions_in_memory = 30
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5

[sequencer]
blob_processing_timeout_secs = 3000
Expand Down
5 changes: 3 additions & 2 deletions examples/demo-rollup/configs/mock_rollup_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ tokio_runtime_metrics_interval_millis = 500
[proof_manager]
aggregated_proof_block_jump = 1
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 100
max_number_of_transitions_in_memory = 30
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5

[sequencer]
blob_processing_timeout_secs = 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ telegraf_address = "udp://127.0.0.1:8095"
[proof_manager]
aggregated_proof_block_jump = 1
prover_address = "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf"
max_number_of_transitions_in_db = 100
max_number_of_transitions_in_memory = 30
max_number_of_transitions_in_db = 1000
max_number_of_transitions_in_memory = 100
max_number_of_aggregated_proofs_in_memory = 5

[sequencer]
blob_processing_timeout_secs = 3000
Expand Down
Loading
Loading