diff --git a/crates/adapters/mock-zkvm/src/host.rs b/crates/adapters/mock-zkvm/src/host.rs index 7594e7679a..2fd1cbace5 100644 --- a/crates/adapters/mock-zkvm/src/host.rs +++ b/crates/adapters/mock-zkvm/src/host.rs @@ -1,9 +1,14 @@ +use std::fmt::Debug; +use std::sync::{Arc, Mutex}; + use crate::notifier::NotificationManager; use crate::{MockCodeCommitment, MockProof, MockZkGuest}; +use serde::de::DeserializeOwned; use serde::Serialize; +use sov_rollup_interface::common::SlotNumber; use sov_rollup_interface::da::DaSpec; use sov_rollup_interface::zk::aggregated_proof::{ - BlockProof, OuterZkvmHost, SerializedAggregatedProof, + AggregatedProofPublicData, BlockProof, OuterZkvmHost, SerializedAggregatedProof, }; use sov_rollup_interface::zk::SerializedZkProof; @@ -12,6 +17,48 @@ use sov_rollup_interface::zk::SerializedZkProof; pub struct MockZkvmHost { notification_manager: NotificationManager, wait_for_proof: bool, + /// Anchor extracted from the most recently produced aggregated proof. + /// Shared across clones so that continuity assertions in + /// [`OuterZkvmHost::run_proof_aggregation`] hold across the whole prover + /// service. + previous_anchor: Arc>>, +} + +/// Continuity anchor extracted from a previously produced aggregated proof. +#[derive(Clone, Debug)] +struct PreviousAggregatedProofAnchor { + final_slot_number: SlotNumber, + genesis_state_root: Vec, + final_state_root: Vec, +} + +impl PreviousAggregatedProofAnchor { + fn from_public_data( + public_data: &AggregatedProofPublicData, + ) -> Self + where + Address: Serialize, + Da: DaSpec, + Root: Serialize, + { + Self { + final_slot_number: public_data.final_slot_number, + genesis_state_root: bincode::serialize(&public_data.genesis_state_root) + .expect("genesis_state_root must be bincode-serializable"), + final_state_root: bincode::serialize(&public_data.final_state_root) + .expect("final_state_root must be bincode-serializable"), + } + } + + fn deserialize_genesis_state_root(&self) -> Root { + bincode::deserialize(&self.genesis_state_root) + .expect("genesis_state_root must be bincode-deserializable") + } + + fn deserialize_final_state_root(&self) -> Root { + bincode::deserialize(&self.final_state_root) + .expect("final_state_root must be bincode-deserializable") + } } impl MockZkvmHost { @@ -20,6 +67,7 @@ impl MockZkvmHost { Self { wait_for_proof: true, notification_manager: Default::default(), + previous_anchor: Arc::new(Mutex::new(None)), } } @@ -28,6 +76,28 @@ impl MockZkvmHost { Self { wait_for_proof: false, notification_manager: Default::default(), + previous_anchor: Arc::new(Mutex::new(None)), + } + } + + /// Like [`Self::new_non_blocking`], but seeded with the public data of the + /// latest verified aggregated proof previously persisted in the ledger DB + /// so that continuity assertions in + /// [`OuterZkvmHost::run_proof_aggregation`] survive a node restart. + pub fn new_non_blocking_with_previous_anchor( + previous_public_data: Option<&AggregatedProofPublicData>, + ) -> Self + where + Address: Serialize, + Da: DaSpec, + Root: Serialize, + { + let previous_anchor = + previous_public_data.map(PreviousAggregatedProofAnchor::from_public_data); + Self { + wait_for_proof: false, + notification_manager: Default::default(), + previous_anchor: Arc::new(Mutex::new(previous_anchor)), } } @@ -87,13 +157,15 @@ impl sov_rollup_interface::zk::ZkvmHost for MockZkvmHost { } impl OuterZkvmHost for MockZkvmHost { - fn run_proof_aggregation( + fn run_proof_aggregation< + Address: Serialize + Clone, + Da: DaSpec, + Root: Serialize + DeserializeOwned + Clone + PartialEq + Debug, + >( &self, genesis_state_root: Root, headers_with_block_proofs: Vec<(Da::BlockHeader, BlockProof)>, ) -> anyhow::Result { - use sov_rollup_interface::zk::aggregated_proof::AggregatedProofPublicData; - let block_proofs_data = headers_with_block_proofs .iter() .map(|(_, bp)| bp) @@ -104,9 +176,44 @@ impl OuterZkvmHost for MockZkvmHost { genesis_state_root, ); - self.add_hint_and_run_inner(&public_data) + let mut previous = self + .previous_anchor + .lock() + .expect("previous_anchor mutex was poisoned"); + + if let Some(prev) = previous.as_ref() { + assert_eq!( + public_data.initial_slot_number, + prev.final_slot_number.next(), + "Aggregated proof continuity violated: new aggregation starts at slot {} but previous aggregation ended at slot {}", + public_data.initial_slot_number, + prev.final_slot_number, + ); + + let prev_genesis_state_root: Root = prev.deserialize_genesis_state_root(); + let prev_final_state_root: Root = prev.deserialize_final_state_root(); + + assert_eq!( + public_data.genesis_state_root, prev_genesis_state_root, + "Aggregated proof continuity violated: genesis_state_root differs from previous aggregation", + ); + assert_eq!( + public_data.initial_state_root, prev_final_state_root, + "Aggregated proof continuity violated: new initial_state_root does not match previous final_state_root", + ); + } else { + assert_eq!(public_data.initial_slot_number, SlotNumber::ONE); + } + + let serialized = self + .add_hint_and_run_inner(&public_data) .map(|raw_aggregated_proof| SerializedAggregatedProof { raw_aggregated_proof, - }) + })?; + + *previous = Some(PreviousAggregatedProofAnchor::from_public_data( + &public_data, + )); + Ok(serialized) } } diff --git a/crates/adapters/risc0/src/host.rs b/crates/adapters/risc0/src/host.rs index ff8e310167..ad0c6c14cc 100644 --- a/crates/adapters/risc0/src/host.rs +++ b/crates/adapters/risc0/src/host.rs @@ -116,7 +116,11 @@ impl ZkvmHost for Risc0Host<'static> { } impl OuterZkvmHost for Risc0Host<'static> { - fn run_proof_aggregation( + fn run_proof_aggregation< + Address: Serialize + Clone, + Da: DaSpec, + Root: Serialize + serde::de::DeserializeOwned + Clone + PartialEq + core::fmt::Debug, + >( &self, _genesis_state_root: Root, _headers_with_block_proofs: Vec<(Da::BlockHeader, BlockProof)>, diff --git a/crates/adapters/sp1/src/host.rs b/crates/adapters/sp1/src/host.rs index 92eefb5e18..439ca90839 100644 --- a/crates/adapters/sp1/src/host.rs +++ b/crates/adapters/sp1/src/host.rs @@ -43,15 +43,30 @@ impl SP1AggregationHost { /// Creates a new aggregation host from the aggregation guest `elf` binary /// and the verifying key (`inner_method_id`) of the inner proof program. pub fn new(elf: &'static [u8], inner_vk: sp1_sdk::SP1VerifyingKey) -> anyhow::Result { + Self::new_with_previous_proof(elf, inner_vk, None) + } + + /// Like [`Self::new`], but seeded with the latest aggregated proof + /// previously persisted in the ledger DB so that recursive verification of + /// the previous outer proof survives a node restart. + pub fn new_with_previous_proof( + elf: &'static [u8], + inner_vk: sp1_sdk::SP1VerifyingKey, + previous_aggregated_proof: Option, + ) -> anyhow::Result { let prover = SP1Prover::new(elf)?; let outer_vk = prover.verifying_key().clone(); + let prev_agg_proof = previous_aggregated_proof + .map(|proof| crate::decode_sp1_proof(&proof.to_serialized_zk_proof())) + .transpose()?; + Ok(Self { inner: Arc::new(Inner { prover, outer_vk, inner_vk, - prev_agg_proof: Mutex::new(None), + prev_agg_proof: Mutex::new(prev_agg_proof), }), }) } @@ -295,7 +310,11 @@ impl ZkvmHost for SP1Host { } impl OuterZkvmHost for SP1AggregationHost { - fn run_proof_aggregation( + fn run_proof_aggregation< + Address: Serialize + Clone, + Da: DaSpec, + Root: Serialize + serde::de::DeserializeOwned + Clone + PartialEq + core::fmt::Debug, + >( &self, _genesis_state_root: Root, headers_with_block_proofs: Vec<(Da::BlockHeader, BlockProof)>, diff --git a/crates/full-node/sov-db/src/ledger_db/mod.rs b/crates/full-node/sov-db/src/ledger_db/mod.rs index 02688fad5d..2db39e0ded 100644 --- a/crates/full-node/sov-db/src/ledger_db/mod.rs +++ b/crates/full-node/sov-db/src/ledger_db/mod.rs @@ -260,8 +260,6 @@ pub struct LedgerDb { // Db key for the latest height of the written STF info. const WRITE_ROLLUP_HEIGHT_ID: StfInfoUniqueId = StfInfoUniqueId(0); -// DB key for the latest height of the retrieved STF info. -const NEXT_SLOT_NUMBER_TO_RECEIVE_ID: StfInfoUniqueId = StfInfoUniqueId(1); // Db key for the oldest saved STF info. const LAST_SLOT_NUMBER_ID: StfInfoUniqueId = StfInfoUniqueId(2); @@ -656,25 +654,6 @@ impl LedgerDb { .await } - /// Materializes the latest height of the retrieved STF info. - pub fn materialize_stf_info_next_slot_number_to_receive( - &self, - read_slot_number: SlotNumber, - ) -> anyhow::Result { - let mut schema_batch = SchemaBatch::new(); - schema_batch.put::(&NEXT_SLOT_NUMBER_TO_RECEIVE_ID, &read_slot_number)?; - Ok(schema_batch) - } - - /// Gets the latest height of the submitted STF info. - pub async fn get_stf_info_next_slot_number_to_receive( - &self, - ) -> anyhow::Result> { - let db = self.db.read().expect(DB_LOCK_POISONED).clone(); - db.get_async::(&NEXT_SLOT_NUMBER_TO_RECEIVE_ID) - .await - } - /// Materializes the oldest height of the retrieved STF info. pub fn materialize_stf_info_oldest_slot_number( &self, diff --git a/crates/full-node/sov-db/tests/integration/ledger_db.rs b/crates/full-node/sov-db/tests/integration/ledger_db.rs index e5282a2743..c80b870a20 100644 --- a/crates/full-node/sov-db/tests/integration/ledger_db.rs +++ b/crates/full-node/sov-db/tests/integration/ledger_db.rs @@ -147,20 +147,6 @@ async fn test_stf_info() { assert_eq!(original_stored_inf_info, stored_stf_info); } -#[tokio::test(flavor = "multi_thread")] -async fn next_slot_number_to_receive_is_none_at_startup() { - let temp_dir = tempfile::tempdir().unwrap(); - let mut storage_manager = SimpleLedgerStorageManager::new(temp_dir.path()); - let ledger_storage = storage_manager.create_ledger_storage(); - - let ledger_db = LedgerDb::with_reader(ledger_storage).unwrap(); - assert!(ledger_db - .get_stf_info_next_slot_number_to_receive() - .await - .unwrap() - .is_none()); -} - #[tokio::test(flavor = "multi_thread")] async fn test_rollback() { let temp_dir = tempfile::tempdir().unwrap(); diff --git a/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/mod.rs b/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/mod.rs index 1aae36c41e..835dcb0cec 100644 --- a/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/mod.rs +++ b/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/mod.rs @@ -18,7 +18,7 @@ use crate::processes::{ProofAggregationStatus, ProofProcessingStatus, StateTrans pub struct ParallelProverService where Address: Serialize + DeserializeOwned, - StateRoot: Serialize + DeserializeOwned + Clone + AsRef<[u8]>, + StateRoot: Serialize + DeserializeOwned + Clone + AsRef<[u8]> + PartialEq + core::fmt::Debug, Witness: Serialize + DeserializeOwned, Da: DaService, InnerVm: Zkvm, @@ -37,7 +37,15 @@ impl where Address: BorshSerialize + AsRef<[u8]> + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, - StateRoot: Serialize + DeserializeOwned + Clone + AsRef<[u8]> + Send + Sync + 'static, + StateRoot: Serialize + + DeserializeOwned + + Clone + + AsRef<[u8]> + + PartialEq + + core::fmt::Debug + + Send + + Sync + + 'static, Witness: Serialize + DeserializeOwned + Send + Sync + 'static, Da: DaService, InnerVm: Zkvm, @@ -87,8 +95,16 @@ impl ProverService where Address: BorshSerialize + AsRef<[u8]> + Serialize + DeserializeOwned + Clone + Send + Sync + 'static, - StateRoot: - BorshSerialize + Serialize + DeserializeOwned + Clone + AsRef<[u8]> + Send + Sync + 'static, + StateRoot: BorshSerialize + + Serialize + + DeserializeOwned + + Clone + + AsRef<[u8]> + + PartialEq + + core::fmt::Debug + + Send + + Sync + + 'static, Witness: Serialize + DeserializeOwned + Send + Sync + 'static, Da: DaService, InnerVm: Zkvm + 'static, diff --git a/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/prover.rs b/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/prover.rs index 10df74c9cd..f756bdd315 100644 --- a/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/prover.rs +++ b/crates/full-node/sov-stf-runner/src/processes/prover_service/parallel/prover.rs @@ -42,7 +42,15 @@ where Da: DaService, Address: BorshSerialize + Serialize + DeserializeOwned + AsRef<[u8]> + Clone + Send + Sync + 'static, - StateRoot: Serialize + DeserializeOwned + Clone + AsRef<[u8]> + Send + Sync + 'static, + StateRoot: Serialize + + DeserializeOwned + + Clone + + AsRef<[u8]> + + PartialEq + + core::fmt::Debug + + Send + + Sync + + 'static, Witness: Serialize + DeserializeOwned + Send + Sync + 'static, { pub(crate) fn new(prover_address: Address, num_threads: usize) -> Self { diff --git a/crates/full-node/sov-stf-runner/src/processes/stf_info_manager.rs b/crates/full-node/sov-stf-runner/src/processes/stf_info_manager.rs index 827fe952f9..06cd8c1edf 100644 --- a/crates/full-node/sov-stf-runner/src/processes/stf_info_manager.rs +++ b/crates/full-node/sov-stf-runner/src/processes/stf_info_manager.rs @@ -59,8 +59,7 @@ impl StateTransitionInfo /// Materializes STF infos and sends notifications to the associated [`Receiver`]. pub struct Sender { /// Height of the next `StateTransitionInfo` that should be received by the [`Receiver`]. - /// This value is synchronized with the receiver end of the channel. On the sender end - /// it is only persisted in the database after a slot completion. + /// This value is synchronized with the receiver end of the channel. next_height_to_receive: Arc, /// The next height to send to the [`Receiver`]. This value is not persisted in the database and @@ -112,27 +111,18 @@ impl< match maybe_write_rollup_height { Some(write_rollup_height) => { - let ledger_next_height_to_receive = ledger_db - .get_stf_info_next_slot_number_to_receive() - .await? - .unwrap_or(SlotNumber::ONE); - - assert_eq!( - ledger_next_height_to_receive.get(), - next_rollup_height_to_receive, - "The next height to receive should be the same as the one stored in the db" - ); - - // Sanity check for `write_rollup_height & next_rollup_height_to_receive` assert!( - write_rollup_height.get() >= next_rollup_height_to_receive, - "The `write_rollup_height` should always be greater than the `next_rollup_height_to_receive`" + write_rollup_height.get() + 1 >= next_rollup_height_to_receive, + "The `write_rollup_height` ({write_rollup_height}) is more than one slot behind `next_rollup_height_to_receive` ({next_rollup_height_to_receive})" ); + let outstanding = write_rollup_height + .get() + .saturating_sub(next_rollup_height_to_receive); assert!( - (write_rollup_height.get() - next_rollup_height_to_receive) <= self.max_nb_of_infos_in_db.get(), + outstanding <= self.max_nb_of_infos_in_db.get(), "Too many STF infos in the db: {}, vs max allowed {} last_submitted={} write={}", - write_rollup_height.get() - next_rollup_height_to_receive, + outstanding, self.max_nb_of_infos_in_db, next_rollup_height_to_receive, write_rollup_height, @@ -140,10 +130,6 @@ impl< } // Db is empty None => { - assert!(ledger_db - .get_stf_info_next_slot_number_to_receive() - .await? - .is_none()); assert!(ledger_db.get_stf_info_oldest_slot_number().await?.is_none()); } } @@ -189,6 +175,7 @@ pub async fn new_stf_info_channel( ledger_db: LedgerDb, max_channel_size: NonZero, max_nb_of_infos_in_db: NonZero, + latest_proof_final_slot: Option, ) -> anyhow::Result<( Sender, Receiver, @@ -207,9 +194,13 @@ pub async fn new_stf_info_channel( let (notifier, receiver) = tokio::sync::mpsc::channel::(max_channel_size.get().try_into()?); - let next_height_to_receive = ledger_db - .get_stf_info_next_slot_number_to_receive() - .await? + // Resume STF-info processing from the last aggregated proof that was + // verified on-chain and persisted in the DB: that proof's `final_slot` is + // the last slot we know is committed, so the prover picks up at + // `final_slot + 1`. If no such proof exists yet (fresh node), start from + // genesis. + let next_height_to_receive = latest_proof_final_slot + .map(|final_slot| final_slot.saturating_add(1)) .unwrap_or(SlotNumber::ONE); let next_height_to_receive_ref = Arc::new(AtomicU64::new(next_height_to_receive.get())); @@ -338,12 +329,7 @@ where // Update the write rollup height. schema.merge(ledger_db.materialize_stf_info_write_slot_number(write_rollup_height)?); - // Send the new changes to the subscribers let next_rollup_height_to_receive = self.next_height_to_receive(); - schema.merge( - ledger_db - .materialize_stf_info_next_slot_number_to_receive(next_rollup_height_to_receive)?, - ); // Prune the oldest entries if needed schema.merge( @@ -409,7 +395,7 @@ where } /// Gets [`StateTransitionInfo`] for the corresponding slot number - pub fn get( + fn get( &self, slot_number: SlotNumber, ) -> anyhow::Result>> { @@ -467,6 +453,20 @@ mod tests { SimpleLedgerStorageManager, Sender, Receiver, + )> { + setup_with_resume(path, max_channel_size, max_nb_of_infos_in_db, None).await + } + + async fn setup_with_resume( + path: &Path, + max_channel_size: u64, + max_nb_of_infos_in_db: u64, + latest_proof_final_slot: Option, + ) -> anyhow::Result<( + LedgerDb, + SimpleLedgerStorageManager, + Sender, + Receiver, )> { let mut storage_manager = SimpleLedgerStorageManager::new(path); let ledger_db = LedgerDb::with_reader(storage_manager.create_ledger_storage())?; @@ -475,6 +475,7 @@ mod tests { ledger_db.clone(), NonZero::new(max_channel_size).unwrap(), NonZero::new(max_nb_of_infos_in_db).unwrap(), + latest_proof_final_slot, ) .await?; @@ -550,8 +551,16 @@ mod tests { // Now the reads are visible. { - let (ledger_db, _, sender, mut receiver) = - setup(temp_dir.path(), channel_size, max_nb_of_infos_in_db).await?; + // The previous block advanced `next_height_to_receive` to 3 in + // memory and persisted writes through slot 12. Resume the channel + // at slot 3 by anchoring to a "previous proof" with `final_slot=2`. + let (ledger_db, _, sender, mut receiver) = setup_with_resume( + temp_dir.path(), + channel_size, + max_nb_of_infos_in_db, + Some(SlotNumber::new(2)), + ) + .await?; let stf_info = receiver.read_next().await?.unwrap(); assert_eq!(stf_info.slot_number.get(), 3); diff --git a/crates/full-node/sov-stf-runner/src/runner.rs b/crates/full-node/sov-stf-runner/src/runner.rs index aaac9ac166..ac6a5426d2 100644 --- a/crates/full-node/sov-stf-runner/src/runner.rs +++ b/crates/full-node/sov-stf-runner/src/runner.rs @@ -139,6 +139,7 @@ where sync_state: Arc, da_service_with_cached_finalized_headers: DaServiceWithCachedFinalizedHeaders, genesis_da_height: u64, + latest_proof_final_slot: Option, ) -> anyhow::Result { error_if_tokio_runtime_is_not_multi_threaded()?; tracing::info!(config = ?runner_config, "Initializing StateTransitionRunner"); @@ -172,6 +173,7 @@ where ledger_db.clone(), config.max_number_of_transitions_in_memory, config.max_number_of_transitions_in_db, + latest_proof_final_slot, ) .await?; diff --git a/crates/full-node/sov-stf-runner/src/state_manager/tests.rs b/crates/full-node/sov-stf-runner/src/state_manager/tests.rs index 851dfef1a8..420d8e0819 100644 --- a/crates/full-node/sov-stf-runner/src/state_manager/tests.rs +++ b/crates/full-node/sov-stf-runner/src/state_manager/tests.rs @@ -172,6 +172,7 @@ async fn test_instant_finality() -> anyhow::Result<()> { state_manager.ledger_db.clone(), NonZero::new(40).unwrap(), NonZero::new(40).unwrap(), + None, ) .await?; state_manager.stf_info_sender = Some(sender); @@ -228,6 +229,7 @@ async fn rejected_aggregated_proofs_are_not_published_as_latest() -> anyhow::Res state_manager.ledger_db.clone(), NonZero::new(40).unwrap(), NonZero::new(40).unwrap(), + None, ) .await?; state_manager.stf_info_sender = Some(sender); diff --git a/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs b/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs index 342c66fe1c..bf1c3b152b 100644 --- a/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs +++ b/crates/full-node/sov-stf-runner/tests/integration/helpers/runner_init.rs @@ -270,6 +270,7 @@ pub async fn initialize_runner( da_sync_state, da_service_with_cache, 0, + None, ) .await .unwrap(); diff --git a/crates/full-node/sov-stf-runner/tests/integration/prover_service/mod.rs b/crates/full-node/sov-stf-runner/tests/integration/prover_service/mod.rs index 8edcb6827b..6af6efaab2 100644 --- a/crates/full-node/sov-stf-runner/tests/integration/prover_service/mod.rs +++ b/crates/full-node/sov-stf-runner/tests/integration/prover_service/mod.rs @@ -25,7 +25,7 @@ fn make_header(header_hash: MockHash, height: u64) -> MockBlockHeader { fn make_transition_info( da_block_header: MockBlockHeader, ) -> StateTransitionInfo, MockDaSpec> { - let height = da_block_header.height; + let height = da_block_header.height + 1; StateTransitionInfo::new( StateTransitionWitness { diff --git a/crates/full-node/sov-stf-runner/tests/integration/prover_service/network.rs b/crates/full-node/sov-stf-runner/tests/integration/prover_service/network.rs index c4ae45fe6a..cf0b517e51 100644 --- a/crates/full-node/sov-stf-runner/tests/integration/prover_service/network.rs +++ b/crates/full-node/sov-stf-runner/tests/integration/prover_service/network.rs @@ -50,7 +50,7 @@ async fn test_network_prove_and_aggregate() { .. } = make_network_prover(false, true, Duration::from_secs(60)); - let header = make_header(MockHash::from([1; 32]), 1); + let header = make_header(MockHash::from([1; 32]), 0); let genesis = genesis_state_root(); // Submit one block — should return ProvingInProgress. @@ -158,8 +158,8 @@ async fn test_network_aggregated_proof_multiple_blocks() { AggregatedProofPublicData, >(&serialized_proof, &MockCodeCommitment::default()) .unwrap(); - assert_eq!(public_data.initial_slot_number.get(), 0); - assert_eq!(public_data.final_slot_number.get(), 4); + assert_eq!(public_data.initial_slot_number.get(), 1); + assert_eq!(public_data.final_slot_number.get(), 5); } ProofAggregationStatus::ProofGenerationInProgress => { panic!("Expected Success after completing all inner proofs") @@ -300,8 +300,8 @@ async fn test_network_aggregation_preserves_proved_entries_across_calls() { } = make_network_prover(false, true, Duration::from_secs(60)); let genesis = genesis_state_root(); - let header_a = make_header(MockHash::from([8; 32]), 1); - let header_b = make_header(MockHash::from([9; 32]), 2); + let header_a = make_header(MockHash::from([8; 32]), 0); + let header_b = make_header(MockHash::from([9; 32]), 1); // Submit two blocks. prover_service diff --git a/crates/full-node/sov-stf-runner/tests/integration/prover_service/parallel.rs b/crates/full-node/sov-stf-runner/tests/integration/prover_service/parallel.rs index 664ffd75ca..2f859fc66a 100644 --- a/crates/full-node/sov-stf-runner/tests/integration/prover_service/parallel.rs +++ b/crates/full-node/sov-stf-runner/tests/integration/prover_service/parallel.rs @@ -44,7 +44,7 @@ async fn test_successful_prover_execution() -> Result<(), ProverServiceError> { .. } = make_new_prover(); - let header = make_header(MockHash::from([0; 32]), 1); + let header = make_header(MockHash::from([0; 32]), 0); prover_service .prove(make_transition_info(header.clone())) .await?; @@ -86,8 +86,8 @@ async fn test_prover_status_busy() -> anyhow::Result<()> { let genesis_state_root = genesis_state_root(); - let headers: Vec<_> = (1..num_worker_threads + 1) - .map(|height| make_header(MockHash::from([height as u8; 32]), height as u64)) + let headers: Vec<_> = (0..num_worker_threads) + .map(|height| make_header(MockHash::from([(height + 1) as u8; 32]), height as u64)) .collect(); // Saturate the prover. @@ -242,8 +242,8 @@ async fn test_aggregated_proof() -> Result<(), ProverServiceError> { AggregatedProofPublicData, >(&serialized_proof, &MockCodeCommitment::default()) .unwrap(); - assert_eq!(public_data.initial_slot_number.get(), 0); - assert_eq!(public_data.final_slot_number.get(), (jump - 1) as u64); + assert_eq!(public_data.initial_slot_number.get(), 1); + assert_eq!(public_data.final_slot_number.get(), jump as u64); } ProofAggregationStatus::ProofGenerationInProgress => panic!("Prover should succeed"), } @@ -272,10 +272,10 @@ async fn test_aggregated_proof() -> Result<(), ProverServiceError> { AggregatedProofPublicData, >(&serialized_proof, &MockCodeCommitment::default()) .unwrap(); - assert_eq!(public_data.initial_slot_number.get() as usize, jump); + assert_eq!(public_data.initial_slot_number.get() as usize, jump + 1); assert_eq!( public_data.final_slot_number.get() as usize, - total_nb_of_blocks - 1 + total_nb_of_blocks ); } ProofAggregationStatus::ProofGenerationInProgress => panic!("Proves should succeed"), diff --git a/crates/full-node/sov-stf-runner/tests/integration/runner_reorg_tests.rs b/crates/full-node/sov-stf-runner/tests/integration/runner_reorg_tests.rs index 4a6c5f9a84..25e005d328 100644 --- a/crates/full-node/sov-stf-runner/tests/integration/runner_reorg_tests.rs +++ b/crates/full-node/sov-stf-runner/tests/integration/runner_reorg_tests.rs @@ -118,6 +118,7 @@ async fn test_runner_with_background_da_service( da_sync_state, da_service_with_cache, genesis_da_height, + None, ) .await?; diff --git a/crates/module-system/module-implementations/sov-prover-incentives/src/capabilities.rs b/crates/module-system/module-implementations/sov-prover-incentives/src/capabilities.rs index dd2719e67a..2e6b36ef32 100644 --- a/crates/module-system/module-implementations/sov-prover-incentives/src/capabilities.rs +++ b/crates/module-system/module-implementations/sov-prover-incentives/src/capabilities.rs @@ -3,9 +3,9 @@ use std::cmp::max; use sov_bank::{config_gas_token_id, Amount, Coins, IntoPayable}; use sov_modules_api::registration_lib::StakeRegistration; use sov_modules_api::{ - AggregatedProofPublicData, Gas, GasSpec, GetGasPrice, InvalidProofError, - SerializedAggregatedProof, Spec, StateReader, Storage, TxState, VersionReader, ZkVerifier, - Zkvm, + AggregatedProofPublicData, CryptoSpec, Gas, GasSpec, GetGasPrice, InvalidProofError, + MeteredHasher, SerializedAggregatedProof, Spec, StateReader, Storage, TxState, VersionReader, + ZkVerifier, Zkvm, }; use sov_rollup_interface::common::SlotNumber; use sov_state::Kernel; @@ -188,6 +188,38 @@ impl ProverIncentives { ))); } + // Hash the *public outputs* (not the raw proof bytes) so we can + // recognise honest retries of an already-accepted proof. Real zkVMs + // (e.g. SP1) produce non-deterministic proof bytes — Groth16/PLONK + // commitments use fresh blinding randomness per run — so the same + // claim about the same window yields different `raw_aggregated_proof` + // bytes on each run but identical public outputs. Hashing the public + // outputs captures "same claim" semantics. A retry whose public + // outputs differ (e.g. different `rewarded_addresses`) is still a + // double-claim attempt and falls into the penalty path below. + let public_outputs_bytes = borsh::to_vec(&public_outputs) + .expect("AggregatedProofPublicData must be borsh-serializable"); + let public_outputs_hash = + MeteredHasher::<_, ::Hasher>::digest( + &public_outputs_bytes, + state, + ) + .map_err(Into::::into)?; + + if let Some(stored_hash) = self + .accepted_public_outputs_hashes + .get(&public_outputs.final_slot_number, state) + .map_err(Into::::into)? + { + if stored_hash == public_outputs_hash { + tracing::debug!( + final_slot_number = %public_outputs.final_slot_number, + "Honest retry of an already-accepted proof (public outputs match); treating as no-op" + ); + return Ok(public_outputs); + } + } + match self.calculate_reward_and_remove( public_outputs.initial_slot_number, public_outputs.final_slot_number, @@ -202,6 +234,13 @@ impl ProverIncentives { } Paycheck::Rewarded(total_reward) => { self.reward_prover(total_reward, prover_address, state)?; + self.accepted_public_outputs_hashes + .set( + &public_outputs.final_slot_number, + &public_outputs_hash, + state, + ) + .map_err(Into::::into)?; Ok(public_outputs) } } diff --git a/crates/module-system/module-implementations/sov-prover-incentives/src/lib.rs b/crates/module-system/module-implementations/sov-prover-incentives/src/lib.rs index ad58381724..adf943253f 100644 --- a/crates/module-system/module-implementations/sov-prover-incentives/src/lib.rs +++ b/crates/module-system/module-implementations/sov-prover-incentives/src/lib.rs @@ -45,6 +45,20 @@ pub struct ProverIncentives { #[state] pub last_claimed_reward: StateValue, + /// Hash of the public outputs of the most recently accepted aggregated + /// proof, keyed by its `final_slot_number`. We hash the public outputs + /// (not the raw proof bytes) because real zkVMs like SP1 produce + /// non-deterministic proof bytes for the same claim — Groth16/PLONK + /// commitments use fresh blinding randomness per run — so two honest + /// proofs over the same window have identical public outputs but + /// different `raw_aggregated_proof`. Used to recognise honest retries + /// (e.g. at-least-once DA resubmission after a crash) and treat them as + /// a no-op. A retry whose public outputs *differ* (e.g. different + /// `rewarded_addresses`) is still penalised — that is what the + /// proving-penalty exists for. + #[state] + pub accepted_public_outputs_hashes: StateMap, + /// A penalty for provers who submit a proof for transitions that were already proven /// /// This quantity is expressed in gas units. When provers are penalized proofs, they will diff --git a/crates/module-system/module-implementations/sov-prover-incentives/tests/integration/proofs.rs b/crates/module-system/module-implementations/sov-prover-incentives/tests/integration/proofs.rs index 644693bcff..7dcad514e9 100644 --- a/crates/module-system/module-implementations/sov-prover-incentives/tests/integration/proofs.rs +++ b/crates/module-system/module-implementations/sov-prover-incentives/tests/integration/proofs.rs @@ -79,8 +79,12 @@ fn test_valid_proof() { }); } +/// Honest at-least-once DA delivery: the prover crashes between publishing a +/// proof and observing its acceptance, then re-publishes the same bytes after +/// restart. The chain must treat the byte-identical retry as a no-op rather +/// than penalising the prover. #[test] -fn test_valid_proof_penalized_if_reward_already_claimed() { +fn test_byte_identical_retry_for_claimed_range_is_no_op() { let (mut runner, prover, other_user) = setup(); let prover_address = prover.user_info.address(); @@ -100,8 +104,10 @@ fn test_valid_proof_penalized_if_reward_already_claimed() { }) .unwrap(); + let proof_bytes = serialize_proof(aggregated_proof); + runner.execute_proof::(ProofTestCase { - input: ProofInput(serialize_proof(aggregated_proof)), + input: ProofInput(proof_bytes.clone()), assert: Box::new(move |result, state| { assert_matches!( result.proof_receipt.unwrap().outcome, @@ -126,6 +132,52 @@ fn test_valid_proof_penalized_if_reward_already_claimed() { }), }); + // Resubmit the *same bytes* — must be a no-op: outcome is Valid, bond + // is unchanged, last_claimed_reward is unchanged. + runner.execute_proof::(ProofTestCase { + input: ProofInput(proof_bytes), + assert: Box::new(move |result, state| { + assert_matches!( + result.proof_receipt.unwrap().outcome, + ProofOutcome::Valid { .. } + ); + let prover_incentives = TestProverIncentives::default(); + assert_eq!( + prover_incentives + .bonded_provers + .get(&prover_address, state) + .unwrap(), + Some(prover.bond), + "Byte-identical retry must not change the bond" + ); + assert_eq!( + prover_incentives + .last_claimed_reward + .get(state) + .unwrap() + .map(|v| v.get()), + Some(2), + "Byte-identical retry must not advance last_claimed_reward" + ); + }), + }); +} + +/// Adversarial / buggy case: a *different* proof is submitted for an +/// already-claimed range. This is what the proving-penalty exists for, and +/// must still be penalised after the duplicate-retry exception. +#[test] +fn test_distinct_proof_for_claimed_range_is_penalized() { + let (mut runner, prover, other_user) = setup(); + let prover_address = prover.user_info.address(); + // A second address used purely to perturb the proof bytes without + // changing any of the public inputs that `check_proof_outputs` validates. + let unrelated_address = other_user.address(); + + for _ in 0..3 { + runner.execute(consume_gas_tx_for_signer(&other_user)); + } + let aggregated_proof = runner .query_visible_state(|state| { build_proof( @@ -139,10 +191,36 @@ fn test_valid_proof_penalized_if_reward_already_claimed() { runner.execute_proof::(ProofTestCase { input: ProofInput(serialize_proof(aggregated_proof)), + assert: Box::new(move |result, _state| { + assert_matches!( + result.proof_receipt.unwrap().outcome, + ProofOutcome::Valid { .. } + ); + }), + }); + + // Build a *different* proof for the same (init, final) range by changing + // `rewarded_addresses` (which `check_proof_outputs` does not bind). The + // bytes differ, so the duplicate-hash short-circuit does not trigger and + // we fall into the genuine penalty path. + let mut different_proof = runner + .query_visible_state(|state| { + build_proof( + state, + 1.to_slot_number(), + 2.to_slot_number(), + prover_address, + ) + }) + .unwrap(); + different_proof.rewarded_addresses = vec![prover_address, unrelated_address]; + + runner.execute_proof::(ProofTestCase { + input: ProofInput(serialize_proof(different_proof)), assert: Box::new(move |result, state| { match result.proof_receipt.clone().unwrap().outcome { ProofOutcome::Invalid(InvalidProofError::ProverPenalized(_), _) => {} - _ => panic!("Expected prover to be penalized"), + _ => panic!("Expected prover to be penalized for distinct proof bytes"), } let prover_incentives = TestProverIncentives::default(); diff --git a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs index 75a01f8129..de690cc909 100644 --- a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs +++ b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs @@ -119,12 +119,18 @@ pub trait FullNodeBlueprint: RollupBlueprint { ) -> Self::DaService; /// Creates an instance of [`ProverService`]. + /// + /// Returns the prover service together with the `final_slot_number` of the + /// latest aggregated proof persisted in the ledger DB (if any). The caller + /// uses this slot to anchor the STF-info stream so the next aggregation is + /// contiguous with the proof on disk. async fn create_prover_service( &self, prover_config: RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, da_service: &Self::DaService, - ) -> Self::ProverService; + ledger_db: &LedgerDb, + ) -> (Self::ProverService, Option); /// Creates an instance of [`Self::StorageManager`]. /// Panics if initialization fails. @@ -492,6 +498,18 @@ pub trait FullNodeBlueprint: RollupBlueprint { let axum_tcp = TcpListener::bind(axum_socket_addr).await?; let axum_socket_addr = axum_tcp.local_addr()?; + // The prover service validates the latest aggregated proof persisted in + // the ledger DB and returns its `final_slot_number`. We pass this slot + // into the runner so the STF-info stream resumes at `final_slot + 1`. + let (prover_service, latest_proof_final_slot) = if prover_config.is_enabled() { + let (svc, slot) = self + .create_prover_service(prover_config, &rollup_config, &da_service, &ledger_db) + .await; + (Some(svc), slot) + } else { + (None, None) + }; + let mut runner = StateTransitionRunner::new( rollup_config.runner.clone(), axum_tcp, @@ -513,6 +531,7 @@ pub trait FullNodeBlueprint: RollupBlueprint { da_sync_state.clone(), da_service_with_cache, genesis_da_height, + latest_proof_final_slot, ) .await?; @@ -532,10 +551,8 @@ pub trait FullNodeBlueprint: RollupBlueprint { .await?; if let Some(stf_info_receiver) = runner.take_stf_info_receiver() { - let prover_service = self - .create_prover_service(prover_config, &rollup_config, &da_service) - .await; - + let prover_service = prover_service + .expect("prover service must be present when stf_info_receiver is Some"); let proof_sender = Box::new(self.create_proof_sender(&rollup_config, sequencer.proof_sender.clone())?); diff --git a/crates/module-system/sov-solana-offchain-auth/tests/integration/blueprint.rs b/crates/module-system/sov-solana-offchain-auth/tests/integration/blueprint.rs index 6d9c0bda1a..df4221e58a 100644 --- a/crates/module-system/sov-solana-offchain-auth/tests/integration/blueprint.rs +++ b/crates/module-system/sov-solana-offchain-auth/tests/integration/blueprint.rs @@ -136,9 +136,13 @@ where prover_config: sov_stf_runner::processes::RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, da_service: &Self::DaService, - ) -> Self::ProverService { + ledger_db: &sov_db::ledger_db::LedgerDb, + ) -> ( + Self::ProverService, + Option, + ) { self.inner - .create_prover_service(prover_config, rollup_config, da_service) + .create_prover_service(prover_config, rollup_config, da_service, ledger_db) .await } diff --git a/crates/rollup-interface/src/state_machine/zk/aggregated_proof/mod.rs b/crates/rollup-interface/src/state_machine/zk/aggregated_proof/mod.rs index 644e31f48a..1e3b594d99 100644 --- a/crates/rollup-interface/src/state_machine/zk/aggregated_proof/mod.rs +++ b/crates/rollup-interface/src/state_machine/zk/aggregated_proof/mod.rs @@ -19,7 +19,11 @@ use crate::zk::SerializedZkProof; /// Host-side interface for the outer zkVM that produces aggregated proofs. pub trait OuterZkvmHost: Clone + Send + Sync + 'static { /// Aggregates per-block inner proofs into a single serialized aggregated proof. - fn run_proof_aggregation( + fn run_proof_aggregation< + Address: Serialize + Clone, + Da: DaSpec, + Root: Serialize + DeserializeOwned + Clone + PartialEq + core::fmt::Debug, + >( &self, genesis_state_root: Root, headers_with_block_proofs: Vec<(Da::BlockHeader, BlockProof)>, @@ -85,27 +89,51 @@ impl core::fmt::Display for CodeCommitmentHash { } /// Public data of an aggregated proof. -#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, BorshSerialize, BorshDeserialize, Clone)] pub struct AggregatedProofPublicData { /// Initial rollup height. pub initial_slot_number: SlotNumber, /// Final rollup height. pub final_slot_number: SlotNumber, /// The genesis state root of the aggregated proof. + #[borsh(bound( + serialize = "Root: borsh::ser::BorshSerialize", + deserialize = "Root: borsh::de::BorshDeserialize" + ))] pub genesis_state_root: Root, /// The initial state root of the aggregated proof. + #[borsh(bound( + serialize = "Root: borsh::ser::BorshSerialize", + deserialize = "Root: borsh::de::BorshDeserialize" + ))] pub initial_state_root: Root, /// The final state root of the aggregated proof. + #[borsh(bound( + serialize = "Root: borsh::ser::BorshSerialize", + deserialize = "Root: borsh::de::BorshDeserialize" + ))] pub final_state_root: Root, /// The initial slot hash of the aggregated proof. + #[borsh(bound( + serialize = "::SlotHash: borsh::ser::BorshSerialize", + deserialize = "::SlotHash: borsh::de::BorshDeserialize" + ))] pub initial_slot_hash: Da::SlotHash, /// The final slot hash of the aggregated proof. + #[borsh(bound( + serialize = "::SlotHash: borsh::ser::BorshSerialize", + deserialize = "::SlotHash: borsh::de::BorshDeserialize" + ))] pub final_slot_hash: Da::SlotHash, /// Inner verifying key hash of the aggregated proof circuit. pub inner_vkey_hash: CodeCommitmentHash, /// Outer verifying key hash of the aggregated proof circuit. pub outer_vk_hash: CodeCommitmentHash, /// These are the addresses of the provers who proved individual blocks. + #[borsh(bound( + serialize = "Address: borsh::ser::BorshSerialize", + deserialize = "Address: borsh::de::BorshDeserialize" + ))] pub rewarded_addresses: Vec
, } diff --git a/crates/utils/sov-test-utils/src/rt_agnostic_blueprint.rs b/crates/utils/sov-test-utils/src/rt_agnostic_blueprint.rs index ff1b33f433..27ffec8729 100644 --- a/crates/utils/sov-test-utils/src/rt_agnostic_blueprint.rs +++ b/crates/utils/sov-test-utils/src/rt_agnostic_blueprint.rs @@ -208,8 +208,12 @@ where prover_config: RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, _da_service: &Self::DaService, - ) -> Self::ProverService { - Prover::create(prover_config, rollup_config).await + _ledger_db: &LedgerDb, + ) -> ( + Self::ProverService, + Option, + ) { + (Prover::create(prover_config, rollup_config).await, None) } fn create_storage_manager( diff --git a/examples/demo-rollup/src/aggregated_proof.rs b/examples/demo-rollup/src/aggregated_proof.rs new file mode 100644 index 0000000000..6ec7e95a66 --- /dev/null +++ b/examples/demo-rollup/src/aggregated_proof.rs @@ -0,0 +1,21 @@ +//! Helper for reading the latest aggregated proof persisted in the ledger DB. + +use sov_db::ledger_db::LedgerDb; +use sov_rollup_interface::node::ledger_api::LedgerStateProvider; +use sov_rollup_interface::zk::aggregated_proof::SerializedAggregatedProof; + +/// Reads the raw bytes of the latest aggregated proof persisted in the ledger +/// DB, without verifying. +/// +/// Panics if the DB read fails. +pub async fn read_latest_aggregated_proof( + ledger_db: &LedgerDb, +) -> Option { + Some( + ledger_db + .get_latest_aggregated_proof() + .await + .expect("Failed to read latest aggregated proof from ledger DB")? + .proof, + ) +} diff --git a/examples/demo-rollup/src/celestia_rollup.rs b/examples/demo-rollup/src/celestia_rollup.rs index 5162f59216..cc1f635698 100644 --- a/examples/demo-rollup/src/celestia_rollup.rs +++ b/examples/demo-rollup/src/celestia_rollup.rs @@ -160,7 +160,11 @@ impl FullNodeBlueprint for CelestiaDemoRollup { _prover_config: RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, _da_service: &Self::DaService, - ) -> Self::ProverService { + _ledger_db: &LedgerDb, + ) -> ( + Self::ProverService, + Option, + ) { let inner_vm = Risc0Host::new(risc0::ROLLUP_ELF); let outer_vm = MockZkvmHost::new_non_blocking(); @@ -172,12 +176,14 @@ impl FullNodeBlueprint for CelestiaDemoRollup { let da_verifier = CelestiaVerifier::new(rollup_params); - ParallelProverService::new_with_default_workers( + let prover = ParallelProverService::new_with_default_workers( inner_vm, outer_vm, da_verifier, rollup_config.proof_manager.prover_address, - ) + ); + + (prover, None) } fn create_storage_manager( diff --git a/examples/demo-rollup/src/external_mock_rollup.rs b/examples/demo-rollup/src/external_mock_rollup.rs index 46e6e4d44c..29b425893b 100644 --- a/examples/demo-rollup/src/external_mock_rollup.rs +++ b/examples/demo-rollup/src/external_mock_rollup.rs @@ -9,7 +9,9 @@ use sov_db::storage_manager::NomtStorageManager; use sov_ethereum::EthRpcConfig; use sov_mock_da::storable::rpc::StorableMockDaClient; use sov_mock_da::MockDaSpec; -use sov_mock_zkvm::{MockCodeCommitment, MockZkvm, MockZkvmHost}; +use sov_mock_zkvm::{ + MockCodeCommitment, MockZkVerifier, MockZkvm, MockZkvmCryptoSpec, MockZkvmHost, +}; use sov_modules_api::configurable_spec::ConfigurableSpec; use sov_modules_api::execution_mode::{Native, WitnessGeneration}; use sov_modules_api::CryptoSpec; @@ -17,14 +19,13 @@ use sov_modules_api::{NodeEndpoints, Spec, Storage}; use sov_modules_rollup_blueprint::pluggable_traits::PluggableSpec; use sov_modules_rollup_blueprint::proof_sender::SovApiProofSender; use sov_modules_rollup_blueprint::{FullNodeBlueprint, RollupBlueprint, SequencerCreationReceipt}; -use sov_risc0_adapter::host::Risc0Host; -use sov_risc0_adapter::Risc0; -use sov_risc0_adapter::Risc0CryptoSpec; -use sov_risc0_adapter::Risc0MethodId; use sov_rollup_full_node_interface::StateUpdateReceiver; +use sov_rollup_interface::common::SlotNumber; use sov_rollup_interface::da::DaSpec; use sov_rollup_interface::node::SyncStatus; -use sov_rollup_interface::zk::ZkvmHost; +use sov_rollup_interface::zk::aggregated_proof::{ + AggregateProofVerifier, AggregatedProofPublicData, +}; use sov_sequencer::{ProofBlobSender, Sequencer}; use sov_state::nomt::prover_storage::NomtProverStorage; use sov_state::DefaultStorageSpec; @@ -32,20 +33,21 @@ use sov_stf_runner::processes::{ParallelProverService, RollupProverConfig}; use sov_stf_runner::RollupConfig; use crate::eth_dev_signer; +use crate::read_latest_aggregated_proof; use crate::solana_offchain_endpoint::solana_offchain_router; -type Hasher = ::Hasher; +type Hasher = ::Hasher; type NativeStorage = NomtProverStorage, ::SlotHash>; /// The default spec of the rollup pub type ExternalMockRollupSpec = ConfigurableSpec< MockDaSpec, - Risc0, + MockZkvm, MockZkvm, MultiAddressEvmSolana, M, - Risc0CryptoSpec, + MockZkvmCryptoSpec, NativeStorage, >; @@ -154,18 +156,35 @@ impl FullNodeBlueprint for ExternalMockDemoRollup { _prover_config: RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, _da_service: &Self::DaService, - ) -> Self::ProverService { - let inner_vm = Risc0Host::new(risc0::MOCK_DA_ELF); - - let outer_vm = MockZkvmHost::new_non_blocking(); + ledger_db: &LedgerDb, + ) -> (Self::ProverService, Option) { + let previous_public_data: Option< + AggregatedProofPublicData< + ::Address, + MockDaSpec, + <::Storage as Storage>::Root, + >, + > = read_latest_aggregated_proof(ledger_db).await.map(|proof| { + AggregateProofVerifier::::new(MockCodeCommitment::default()) + .verify(&proof) + .expect("Persisted aggregated proof failed verification") + }); + + let latest_proof_final_slot = previous_public_data.as_ref().map(|p| p.final_slot_number); + + let inner_vm = MockZkvmHost::new_non_blocking(); + let outer_vm = + MockZkvmHost::new_non_blocking_with_previous_anchor(previous_public_data.as_ref()); let da_verifier = Default::default(); - ParallelProverService::new_with_default_workers( + let prover = ParallelProverService::new_with_default_workers( inner_vm, outer_vm, da_verifier, rollup_config.proof_manager.prover_address, - ) + ); + + (prover, latest_proof_final_slot) } fn create_storage_manager( @@ -184,9 +203,9 @@ impl FullNodeBlueprint for ExternalMockDemoRollup { Ok(Self::ProofSender::new(sequence_number_provider)) } - fn compute_code_commitments() -> anyhow::Result<(Risc0MethodId, MockCodeCommitment)> { - let inner = Risc0Host::new(risc0::MOCK_DA_ELF).code_commitment()?; - let outer = MockZkvmHost::new_non_blocking().code_commitment()?; - Ok((inner, outer)) + fn compute_code_commitments() -> anyhow::Result<(MockCodeCommitment, MockCodeCommitment)> { + // MockZkvm has no ELF to derive a commitment from — the default zero + // commitment matches what `MockZkvmHost::code_commitment` returns. + Ok((MockCodeCommitment::default(), MockCodeCommitment::default())) } } diff --git a/examples/demo-rollup/src/lib.rs b/examples/demo-rollup/src/lib.rs index be094618bd..063329498a 100644 --- a/examples/demo-rollup/src/lib.rs +++ b/examples/demo-rollup/src/lib.rs @@ -9,6 +9,9 @@ use std::str::FromStr; use sov_celestia_adapter::types::Namespace; use sov_modules_api::macros::config_value; +mod aggregated_proof; +pub use aggregated_proof::read_latest_aggregated_proof; + mod mock_rollup; pub use mock_rollup::*; diff --git a/examples/demo-rollup/src/mock_rollup.rs b/examples/demo-rollup/src/mock_rollup.rs index 13b82eb8dc..24dfda30ab 100644 --- a/examples/demo-rollup/src/mock_rollup.rs +++ b/examples/demo-rollup/src/mock_rollup.rs @@ -9,7 +9,9 @@ use sov_db::storage_manager::NomtStorageManager; use sov_ethereum::EthRpcConfig; use sov_mock_da::storable::StorableMockDaService; use sov_mock_da::MockDaSpec; -use sov_mock_zkvm::{MockCodeCommitment, MockZkvm, MockZkvmCryptoSpec, MockZkvmHost}; +use sov_mock_zkvm::{ + MockCodeCommitment, MockZkVerifier, MockZkvm, MockZkvmCryptoSpec, MockZkvmHost, +}; use sov_modules_api::configurable_spec::ConfigurableSpec; use sov_modules_api::execution_mode::{Native, WitnessGeneration}; use sov_modules_api::{CryptoSpec, NodeEndpoints, Spec}; @@ -17,8 +19,12 @@ use sov_modules_rollup_blueprint::pluggable_traits::PluggableSpec; use sov_modules_rollup_blueprint::proof_sender::SovApiProofSender; use sov_modules_rollup_blueprint::{FullNodeBlueprint, RollupBlueprint, SequencerCreationReceipt}; use sov_rollup_full_node_interface::StateUpdateReceiver; +use sov_rollup_interface::common::SlotNumber; use sov_rollup_interface::da::DaSpec; use sov_rollup_interface::node::SyncStatus; +use sov_rollup_interface::zk::aggregated_proof::{ + AggregateProofVerifier, AggregatedProofPublicData, +}; use sov_sequencer::{ProofBlobSender, Sequencer}; use sov_state::nomt::prover_storage::NomtProverStorage; use sov_state::{DefaultStorageSpec, Storage}; @@ -26,6 +32,7 @@ use sov_stf_runner::processes::{ParallelProverService, RollupProverConfig}; use sov_stf_runner::RollupConfig; use crate::eth_dev_signer; +use crate::read_latest_aggregated_proof; use crate::solana_offchain_endpoint::solana_offchain_router; /// Rollup with a [`ConfigurableSpec`] with [`MockDaSpec`] as Da spec, [`MockZkvm`] inner vm and [`MockZkvm`] for outer vm @@ -147,17 +154,35 @@ impl FullNodeBlueprint for MockDemoRollup { _prover_config: RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, _da_service: &Self::DaService, - ) -> Self::ProverService { + ledger_db: &LedgerDb, + ) -> (Self::ProverService, Option) { + let previous_public_data: Option< + AggregatedProofPublicData< + ::Address, + MockDaSpec, + <::Storage as Storage>::Root, + >, + > = read_latest_aggregated_proof(ledger_db).await.map(|proof| { + AggregateProofVerifier::::new(MockCodeCommitment::default()) + .verify(&proof) + .expect("Persisted aggregated proof failed verification") + }); + + let latest_proof_final_slot = previous_public_data.as_ref().map(|p| p.final_slot_number); + let inner_vm = MockZkvmHost::new_non_blocking(); - let outer_vm = MockZkvmHost::new_non_blocking(); + let outer_vm = + MockZkvmHost::new_non_blocking_with_previous_anchor(previous_public_data.as_ref()); let da_verifier = Default::default(); - ParallelProverService::new_with_default_workers( + let prover = ParallelProverService::new_with_default_workers( inner_vm, outer_vm, da_verifier, rollup_config.proof_manager.prover_address, - ) + ); + + (prover, latest_proof_final_slot) } fn create_storage_manager( diff --git a/examples/demo-rollup/src/mock_sp1_rollup.rs b/examples/demo-rollup/src/mock_sp1_rollup.rs index 77a8b5afa6..a0ed30ef5c 100644 --- a/examples/demo-rollup/src/mock_sp1_rollup.rs +++ b/examples/demo-rollup/src/mock_sp1_rollup.rs @@ -16,17 +16,22 @@ use sov_modules_rollup_blueprint::pluggable_traits::PluggableSpec; use sov_modules_rollup_blueprint::proof_sender::SovApiProofSender; use sov_modules_rollup_blueprint::{FullNodeBlueprint, RollupBlueprint, SequencerCreationReceipt}; use sov_rollup_full_node_interface::StateUpdateReceiver; +use sov_rollup_interface::common::SlotNumber; use sov_rollup_interface::da::DaSpec; use sov_rollup_interface::node::SyncStatus; +use sov_rollup_interface::zk::aggregated_proof::{ + AggregateProofVerifier, AggregatedProofPublicData, +}; use sov_sequencer::{ProofBlobSender, Sequencer}; use sov_sp1_adapter::host::{SP1AggregationHost, SP1Host}; -use sov_sp1_adapter::{SP1CryptoSpec, SP1MethodId, SP1}; +use sov_sp1_adapter::{SP1CryptoSpec, SP1MethodId, SP1Verifier, SP1}; use sov_state::nomt::prover_storage::NomtProverStorage; use sov_state::{DefaultStorageSpec, Storage}; use sov_stf_runner::processes::{ParallelProverService, RollupProverConfig}; use sov_stf_runner::RollupConfig; use crate::eth_dev_signer; +use crate::read_latest_aggregated_proof; use crate::solana_offchain_endpoint::solana_offchain_router; /// Rollup with a [`ConfigurableSpec`] with [`MockDaSpec`] as Da spec, and [`SP1`] for both inner and outer vm @@ -141,7 +146,8 @@ impl FullNodeBlueprint for MockSp1DemoRollup { _prover_config: RollupProverConfig, rollup_config: &RollupConfig<::Address, Self::DaService>, _da_service: &Self::DaService, - ) -> Self::ProverService { + ledger_db: &LedgerDb, + ) -> (Self::ProverService, Option) { let elf: &[u8] = *sp1::SP1_GUEST_MOCK_ELF; let agg_elf: &[u8] = *sp1::SP1_GUEST_AGGREGATION_MOCK_ELF; @@ -161,21 +167,43 @@ impl FullNodeBlueprint for MockSp1DemoRollup { let inner_verifying_key = inner_vm.verifying_key().clone(); + let previous_aggregated_proof = read_latest_aggregated_proof(ledger_db).await; + + let previous_for_outer = previous_aggregated_proof.clone(); let outer_vm = tokio::task::spawn_blocking(move || { - SP1AggregationHost::new(agg_elf, inner_verifying_key) - .expect("Failed to create SP1AggregationHost from aggregation guest ELF") + SP1AggregationHost::new_with_previous_proof( + agg_elf, + inner_verifying_key, + previous_for_outer, + ) + .expect("Failed to create SP1AggregationHost from aggregation guest ELF") }) .await .expect("SP1AggregationHost setup task panicked"); + // Validate the persisted proof and extract the `final_slot_number` so + // the runner can rewind the STF-info stream to `final_slot + 1`. + let latest_proof_final_slot = previous_aggregated_proof.as_ref().map(|proof| { + let public_data: AggregatedProofPublicData< + ::Address, + MockDaSpec, + <::Storage as Storage>::Root, + > = AggregateProofVerifier::::new(outer_vm.code_commitment()) + .verify(proof) + .expect("Persisted aggregated proof failed verification"); + public_data.final_slot_number + }); + let da_verifier = Default::default(); - ParallelProverService::new_with_default_workers( + let prover = ParallelProverService::new_with_default_workers( inner_vm, outer_vm, da_verifier, rollup_config.proof_manager.prover_address, - ) + ); + + (prover, latest_proof_final_slot) } fn create_storage_manager( diff --git a/examples/demo-rollup/tests/restart/mod.rs b/examples/demo-rollup/tests/restart/mod.rs index 6312dfaf88..bd22296617 100644 --- a/examples/demo-rollup/tests/restart/mod.rs +++ b/examples/demo-rollup/tests/restart/mod.rs @@ -1,2 +1,3 @@ mod crash; mod graceful; +mod proofs; diff --git a/examples/demo-rollup/tests/restart/proofs.rs b/examples/demo-rollup/tests/restart/proofs.rs new file mode 100644 index 0000000000..9fe7b0c9cb --- /dev/null +++ b/examples/demo-rollup/tests/restart/proofs.rs @@ -0,0 +1,124 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use futures::StreamExt; +use sov_demo_rollup::ExternalMockDemoRollup; +use sov_full_node_configs::sequencer::SequencerKindConfig; +use sov_mock_da::storable::rpc::{start_server, MockDaClientConfig}; +use sov_mock_da::storable::StorableMockDaService; +use sov_mock_da::{BlockProducingConfig, MockAddress, MockDaConfig}; +use sov_modules_api::execution_mode::Native; +use sov_modules_api::OperatingMode; +use sov_rollup_interface::da::BlockHeaderTrait; +use sov_rollup_interface::node::da::DaService; +use sov_sequencer::preferred::RecoveryStrategy; +use sov_stf_runner::processes::RollupProverConfig; +use sov_test_utils::test_rollup::{RollupBuilder, TestRollup}; +use sov_test_utils::TEST_DEFAULT_MOCK_DA_BLOCK_TIME_MS; +use tokio::sync::watch; + +use crate::test_helpers::test_genesis_source; + +const TEST_SEQ_DA_ADDRESS: MockAddress = MockAddress::new([0; 32]); +const AGGREGATED_PROOF_BLOCK_JUMP: usize = 3; +const PROOF_WAIT_TIMEOUT: Duration = Duration::from_secs(120); +const OFFLINE_DA_BLOCKS: u32 = 3; + +async fn create_da_service_periodic() -> (StorableMockDaService, watch::Sender<()>, SocketAddr) { + let (shutdown_sender, shutdown_receiver) = tokio::sync::watch::channel(()); + let mut da_config = MockDaConfig::instant_with_sender(TEST_SEQ_DA_ADDRESS); + da_config.block_producing = BlockProducingConfig::Periodic { + block_time_ms: TEST_DEFAULT_MOCK_DA_BLOCK_TIME_MS * 2, + }; + + let da_service = StorableMockDaService::from_config(da_config, shutdown_receiver).await; + + let addr = start_server(da_service.clone(), "127.0.0.1", 0) + .await + .unwrap(); + + (da_service, shutdown_sender, addr) +} + +async fn build_rollup_with_prover( + addr: SocketAddr, +) -> RollupBuilder> { + let genesis = test_genesis_source(OperatingMode::Zk); + let da_config = MockDaClientConfig { + url: format!("http://{addr}"), + }; + RollupBuilder::new_with_external_da(genesis, da_config, None) + .await + .enable_prover() + .set_config(|c| { + c.blob_processing_timeout_secs = 300; + c.aggregated_proof_block_jump = AGGREGATED_PROOF_BLOCK_JUMP; + c.rollup_prover_config = RollupProverConfig::Prove; + if let SequencerKindConfig::Preferred(p) = &mut c.sequencer_config { + p.disable_state_root_consistency_checks = true; + p.num_cache_warmup_workers = 0; + p.recovery_strategy = RecoveryStrategy::TryToSave; + p.ideal_lag_behind_finalized_slot = 3; + } + }) +} + +async fn wait_for_aggregated_proofs( + test_rollup: &TestRollup>, + count: usize, +) { + let api_client = test_rollup.api_client().clone(); + let mut proofs = api_client.subscribe_aggregated_proof().await.unwrap(); + for i in 0..count { + let proof = tokio::time::timeout(PROOF_WAIT_TIMEOUT, proofs.next()) + .await + .unwrap_or_else(|_| panic!("Timed out waiting for aggregated proof #{}", i + 1)) + .expect("Aggregated proof stream ended unexpectedly") + .expect("Aggregated proof message was an error"); + tracing::info!(proof_index = i + 1, ?proof, "Received aggregated proof",); + } +} + +/// Integration test: external-DA rollup keeps processing aggregated proofs after a restart. +/// +/// Scenario: +/// 1. Start the rollup against an external mock DA, wait for 3 aggregated proofs. +/// 2. Shut the rollup down for at least 3 DA blocks (the DA layer keeps producing while the rollup is offline). +/// 3. Restart the rollup on the same storage and verify aggregated proofs are still being produced. +#[tokio::test(flavor = "multi_thread")] +async fn test_aggregated_proofs_after_restart_external_da() { + //sov_test_utils::logging::initialize_or_change_logging_with_filter("info,tower=off"); + + let (da_service, da_shutdown, addr) = create_da_service_periodic().await; + // Give the DA layer some headroom so the rollup doesn't immediately starve on startup. + da_service.wait_for_height(10).await.unwrap(); + + // Phase 1: start the rollup and wait for 3 aggregated proofs. + let test_rollup = build_rollup_with_prover(addr) + .await + .start_test_rollup() + .await + .unwrap(); + test_rollup.wait_for_sequencer_ready().await.unwrap(); + + wait_for_aggregated_proofs(&test_rollup, 3).await; + + // Phase 2: shut the rollup down and wait for the DA to advance by at least 3 blocks. + let height_at_shutdown = da_service.get_head_block_header().await.unwrap().height() as u32; + + let builder = test_rollup.shutdown().await.unwrap(); + + da_service + .wait_for_height(height_at_shutdown + OFFLINE_DA_BLOCKS) + .await + .unwrap(); + + // Phase 3: restart on the same storage and confirm aggregated proofs keep flowing. + let test_rollup = builder.start_test_rollup().await.unwrap(); + test_rollup.wait_for_sequencer_ready().await.unwrap(); + + wait_for_aggregated_proofs(&test_rollup, 9).await; + + let _ = test_rollup.shutdown().await; + let _ = da_shutdown.send(()); +} diff --git a/examples/demo-rollup/tests/resync/mod.rs b/examples/demo-rollup/tests/resync/mod.rs index 3fe1a11dd5..8bb303afb7 100644 --- a/examples/demo-rollup/tests/resync/mod.rs +++ b/examples/demo-rollup/tests/resync/mod.rs @@ -320,6 +320,7 @@ async fn test_rollup_resync() -> anyhow::Result<()> { (Level::WARN, "slow statement: execution time exceeded alert threshold".to_string()), // TODO - investigate: https://github.com/Sovereign-Labs/sovereign-sdk-wip/issues/2978 (Level::WARN, "Received error updating target height, stopping background task".to_string()), + (Level::WARN, "The node has a higher sequence number than the sequencer, but we're very close to the chain tip, i.e. we don't expect to be simply syncing. This could mean there is another preferred sequencer running (which is not supported and will likely lead to issues), or you very recently restarted the node and there's still some in-flight blobs. Resyncing to the chain tip.".to_string()), // This is expected for the second resync: since we have batches in the sequencer DB, we // are indeed causing a delay for users (Level::WARN, "The sequencer must pause because the node has lagged behind the DA blockchain. This might lead to a brief downtime for users.".to_string()),