diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index cb665ec9e99..f092d288feb 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -18,7 +18,7 @@ use crate::{ComputeError, ComputeEvent, ProcessorExt, Result, service::SubService}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, + Announce, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, db::{ AnnounceStorageRO, AnnounceStorageRW, BlockMetaStorageRO, CodesStorageRW, ConfigStorageRO, GlobalsStorageRW, OnChainStorageRO, @@ -87,6 +87,11 @@ pub struct ComputeSubService { computation: Option, promises_stream: Option, pending_event: Option>, + + /// Input for canonical-only computation (block_hash, parent_announce, gas_allowance). + canonical_input: Option<(H256, HashOf, u64)>, + /// Active canonical-only computation future. + canonical_computation: Option>>, } impl ComputeSubService

{ @@ -100,6 +105,8 @@ impl ComputeSubService

{ computation: None, promises_stream: None, pending_event: None, + canonical_input: None, + canonical_computation: None, } } @@ -111,6 +118,17 @@ impl ComputeSubService

{ self.input.push_back((announce, promise_policy)); } + /// Request canonical-only computation. Cancels any stale canonical computation. + pub fn receive_canonical_to_compute( + &mut self, + block_hash: H256, + parent_announce: HashOf, + gas_allowance: u64, + ) { + self.canonical_computation = None; + self.canonical_input = Some((block_hash, parent_announce, gas_allowance)); + } + async fn compute( db: Database, config: ComputeConfig, @@ -191,12 +209,95 @@ impl ComputeSubService

{ Ok(announce_hash) } + + /// Compute canonical events only, returning ProgramStates without announce metadata writes. + /// If the parent announce is not yet computed, computes predecessors first (those ARE + /// real announces that get full DB writes). Only the synthetic announce is ephemeral. + async fn compute_canonical_only( + db: Database, + config: ComputeConfig, + mut processor: P, + block_hash: H256, + parent_announce: HashOf, + gas_allowance: u64, + ) -> Result<(H256, ProgramStates)> { + if !db.block_meta(block_hash).prepared { + return Err(ComputeError::BlockNotPrepared(block_hash)); + } + + // Build synthetic announce with empty TXs — never stored in DB + let synthetic = Announce { + block_hash, + parent: parent_announce, + gas_allowance: Some(gas_allowance), + injected_transactions: vec![], + }; + + // Compute any uncomputed predecessors. These are real announces that need + // full DB writes (announce_outcome, announce_program_states, etc.). + // On a fast chain, the parent announce may still be computing when we start. + let predecessors = utils::collect_not_computed_predecessors(&synthetic, &db)?; + if !predecessors.is_empty() { + log::trace!( + "compute-canonical: {} uncomputed predecessor(s) for parent {parent_announce}", + predecessors.len(), + ); + for (hash, announce) in predecessors { + Self::compute_one(&db, &mut processor, config, hash, announce, None).await?; + } + } + + // Run canonical events through processor. CAS/state blobs are written (idempotent), + // but we skip announce-level metadata writes for the synthetic announce. + let executable = + utils::prepare_executable_for_announce(&db, synthetic, config.canonical_quarantine())?; + let result = processor.process_programs(executable, None).await?; + + Ok((block_hash, result.states)) + } } impl SubService for ComputeSubService

{ type Output = ComputeEvent; fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> { + // NOTE: Canonical computation and announce computation use separate future slots + // and can run concurrently. This is by design — they are sequential in the producer + // state machine (canonical finishes before announce starts), but the compute layer + // doesn't enforce ordering. Two processor clones may be alive simultaneously, + // doubling WASM runtime memory briefly. + + // Poll canonical computation: start if idle, then poll until pending. + loop { + if let Some(ref mut computation) = self.canonical_computation { + if let Poll::Ready(result) = computation.poll_unpin(cx) { + self.canonical_computation = None; + return Poll::Ready(result.map(|(block_hash, program_states)| { + ComputeEvent::CanonicalEventsComputed(block_hash, program_states) + })); + } + break; + } + + if let Some((block_hash, parent_announce, gas_allowance)) = self.canonical_input.take() + { + self.canonical_computation = Some( + Self::compute_canonical_only( + self.db.clone(), + self.config, + self.processor.clone(), + block_hash, + parent_announce, + gas_allowance, + ) + .boxed(), + ); + continue; + } + + break; + } + if self.computation.is_none() && self.promises_stream.is_none() && let Some((announce, promise_policy)) = self.input.pop_front() diff --git a/ethexe/compute/src/lib.rs b/ethexe/compute/src/lib.rs index a5c3b8618db..6f5a3f7b6d8 100644 --- a/ethexe/compute/src/lib.rs +++ b/ethexe/compute/src/lib.rs @@ -20,7 +20,7 @@ pub use compute::{ ComputeConfig, ComputeSubService, utils::{find_canonical_events_post_quarantine, prepare_executable_for_announce}, }; -use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, injected::Promise}; +use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, ProgramStates, injected::Promise}; use ethexe_processor::{ExecutableData, ProcessedCodeInfo, Processor, ProcessorError}; use ethexe_runtime_common::FinalizedBlockTransitions; use gprimitives::{CodeId, H256}; @@ -47,6 +47,9 @@ pub enum ComputeEvent { CodeProcessed(CodeId), BlockPrepared(H256), AnnounceComputed(HashOf), + /// Canonical events computed for a block. Contains (block_hash, program_states). + /// The program_states are ephemeral — not stored in DB as an announce. + CanonicalEventsComputed(H256, ProgramStates), Promise(Promise, HashOf), } diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index 5b96f0256a0..4fb3928c610 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -24,7 +24,7 @@ use crate::{ compute::{ComputeConfig, ComputeSubService}, prepare::PrepareSubService, }; -use ethexe_common::{Announce, CodeAndIdUnchecked, PromisePolicy}; +use ethexe_common::{Announce, CodeAndIdUnchecked, HashOf, PromisePolicy}; use ethexe_db::Database; use ethexe_processor::Processor; use futures::{Stream, stream::FusedStream}; @@ -87,6 +87,21 @@ impl ComputeService

{ self.compute_sub_service .receive_announce_to_compute(announce, promise_policy); } + + /// Request canonical-only computation for a block. + /// Returns ProgramStates without writing announce metadata to DB. + pub fn compute_canonical_events( + &mut self, + block_hash: H256, + parent_announce: HashOf, + gas_allowance: u64, + ) { + self.compute_sub_service.receive_canonical_to_compute( + block_hash, + parent_announce, + gas_allowance, + ); + } } impl Stream for ComputeService

{ @@ -138,12 +153,61 @@ pub(crate) trait SubService: Unpin + Send + 'static { mod tests { use super::*; - use ethexe_common::{CodeAndIdUnchecked, db::*, mock::*}; + use ethexe_common::{Announce, CodeAndIdUnchecked, db::*, mock::*}; use ethexe_db::Database as DB; use futures::StreamExt; use gear_core::ids::prelude::CodeIdExt; use gprimitives::CodeId; + /// Test canonical-only computation returns ProgramStates without announce metadata writes. + #[tokio::test] + #[ntest::timeout(10000)] + async fn compute_canonical_events() { + gear_utils::init_default_logger(); + + let db = DB::memory(); + let mut service = ComputeService::new_mock_processor(db.clone()); + + // Setup: chain of 2 blocks. Block 1 has a computed announce. + // We'll prepare block 2 and run canonical compute on it. + let chain = BlockChain::mock(2).setup(&db); + let block = chain.blocks[2].to_simple(); + + // Block 2 is already prepared by BlockChain::mock().setup() + assert!(db.block_meta(block.hash).prepared, "block must be prepared"); + + let parent_announce_hash = chain.block_top_announce_hash(1); + assert!( + db.announce_meta(parent_announce_hash).computed, + "parent announce must be computed" + ); + + // Request canonical-only computation + service.compute_canonical_events(block.hash, parent_announce_hash, 42); + + // Poll service — should get CanonicalEventsComputed + let event = service.next().await.unwrap().unwrap(); + match event { + ComputeEvent::CanonicalEventsComputed(hash, _states) => { + assert_eq!(hash, block.hash); + } + other => panic!("Expected CanonicalEventsComputed, got {other:?}"), + } + + // Verify NO announce metadata was written for the synthetic announce. + let synthetic = Announce { + block_hash: block.hash, + parent: parent_announce_hash, + gas_allowance: Some(42), + injected_transactions: vec![], + }; + let synthetic_hash = synthetic.to_hash(); + assert!( + !db.announce_meta(synthetic_hash).computed, + "Synthetic announce must NOT be marked as computed in DB" + ); + } + /// Test ComputeService block preparation functionality #[tokio::test] async fn prepare_block() { diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 337a086c95d..7d66c027b0b 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -719,18 +719,28 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result { - db.set_injected_transaction(tx.clone()); - } - + // Valid TX or state-dependent conditions that may resolve after canonical + // events are computed. The producer validates against post-canonical states + // (two-phase compute), but accept_announce validates against parent states. + // Programs created in the current block appear as UnknownDestination here + // but become valid after canonical execution. + TxValidity::Valid + | TxValidity::UnknownDestination + | TxValidity::UninitializedDestination + | TxValidity::InsufficientBalanceForInjectedMessages => {} + + // Structural violations that cannot resolve after canonical execution. validity => { tracing::trace!( announce = ?announce.to_hash(), - "announce contains invalid transition with status {validity_status:?}, rejecting announce." + "announce contains structurally invalid tx: {validity_status:?}, rejecting announce." ); return Ok(AnnounceStatus::Rejected { @@ -768,6 +778,11 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result Result<()> { + // Connect node does not produce announces, so canonical events are irrelevant. + Ok(()) + } + fn receive_announces_response(&mut self, response: AnnouncesResponse) -> Result<()> { let State::WaitingForMissingAnnounces { block, diff --git a/ethexe/consensus/src/lib.rs b/ethexe/consensus/src/lib.rs index c1b8ae43850..25e478f027d 100644 --- a/ethexe/consensus/src/lib.rs +++ b/ethexe/consensus/src/lib.rs @@ -34,7 +34,7 @@ use anyhow::Result; use ethexe_common::{ - Announce, Digest, HashOf, PromisePolicy, SimpleBlockData, + Announce, Digest, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, consensus::{BatchCommitmentValidationReply, VerifiedAnnounce, VerifiedValidationRequest}, injected::{Promise, SignedInjectedTransaction, SignedPromise}, network::{AnnouncesRequest, AnnouncesResponse, SignedValidatorMessage}, @@ -93,6 +93,13 @@ pub trait ConsensusService: /// Process a received injected transaction from network fn receive_injected_transaction(&mut self, tx: SignedInjectedTransaction) -> Result<()>; + + /// Process computed canonical events (ephemeral ProgramStates for TX validation) + fn receive_canonical_events_computed( + &mut self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result<()>; } #[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] @@ -127,6 +134,10 @@ pub enum ConsensusEvent { /// Informational event: commitment was successfully submitted #[from] CommitmentSubmitted(CommitmentSubmitted), + /// Outer service must compute canonical events only (no announce metadata writes). + /// CAS/state blob writes still occur during process_programs execution. + /// Contains (block_hash, parent_announce, gas_allowance). + ComputeCanonicalEvents(H256, HashOf, u64), /// Informational event: during service processing, a warning situation was detected Warning(String), } diff --git a/ethexe/consensus/src/tx_validation.rs b/ethexe/consensus/src/tx_validation.rs index 77ec477c7aa..c9f4a3fd3eb 100644 --- a/ethexe/consensus/src/tx_validation.rs +++ b/ethexe/consensus/src/tx_validation.rs @@ -26,6 +26,7 @@ use ethexe_common::{ use ethexe_runtime_common::state::Storage; use gprimitives::H256; use hashbrown::HashSet; +use std::borrow::Cow; /// Minimum executable balance for a program to receive injected transactions. /// 100 - is value per gas @@ -54,15 +55,17 @@ pub enum TxValidity { InsufficientBalanceForInjectedMessages, } -pub struct TxValidityChecker { +pub struct TxValidityChecker<'a, DB> { db: DB, chain_head: SimpleBlockData, start_block_hash: H256, recent_included_txs: HashSet>, - latest_states: ProgramStates, + latest_states: Cow<'a, ProgramStates>, } -impl TxValidityChecker { +impl<'a, DB: OnChainStorageRO + AnnounceStorageRO + GlobalsStorageRO + Storage> + TxValidityChecker<'a, DB> +{ pub fn new_for_announce( db: DB, chain_head: SimpleBlockData, @@ -82,13 +85,32 @@ impl TxVa let start_block_hash = db.globals().start_block_hash; Ok(Self { recent_included_txs: Self::collect_recent_included_txs(&db, announce)?, - latest_states: db - .announce_program_states(last_computed_predecessor) - .ok_or_else(|| { - anyhow!( - "Cannot find computed announce {last_computed_predecessor} programs states in db" - ) - })?, + latest_states: Cow::Owned( + db.announce_program_states(last_computed_predecessor) + .ok_or_else(|| { + anyhow!( + "Cannot find computed announce {last_computed_predecessor} programs states in db" + ) + })?, + ), + db, + chain_head, + start_block_hash, + }) + } + + /// Create checker with borrowed ProgramStates (from canonical compute). + /// Avoids cloning the BTreeMap. `parent_announce` is used only for duplicate TX detection. + pub fn new_with_states( + db: DB, + chain_head: SimpleBlockData, + parent_announce: HashOf, + program_states: &'a ProgramStates, + ) -> Result { + let start_block_hash = db.globals().start_block_hash; + Ok(Self { + recent_included_txs: Self::collect_recent_included_txs(&db, parent_announce)?, + latest_states: Cow::Borrowed(program_states), db, chain_head, start_block_hash, diff --git a/ethexe/consensus/src/validator/mod.rs b/ethexe/consensus/src/validator/mod.rs index 5385040906d..33b8dfff5a6 100644 --- a/ethexe/consensus/src/validator/mod.rs +++ b/ethexe/consensus/src/validator/mod.rs @@ -55,7 +55,7 @@ use anyhow::Result; pub use core::BatchCommitter; use derive_more::{Debug, From}; use ethexe_common::{ - Address, Announce, HashOf, SimpleBlockData, + Address, Announce, HashOf, ProgramStates, SimpleBlockData, consensus::{VerifiedAnnounce, VerifiedValidationRequest}, db::ConfigStorageRO, ecdsa::PublicKey, @@ -249,6 +249,16 @@ impl ConsensusService for ValidatorService { fn receive_injected_transaction(&mut self, tx: SignedInjectedTransaction) -> Result<()> { self.update_inner(|inner| inner.process_injected_transaction(tx)) } + + fn receive_canonical_events_computed( + &mut self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result<()> { + self.update_inner(|inner| { + inner.process_canonical_events_computed(block_hash, program_states) + }) + } } impl Stream for ValidatorService { @@ -368,6 +378,14 @@ where DefaultProcessing::injected_transaction(self, tx) } + fn process_canonical_events_computed( + self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result { + DefaultProcessing::canonical_events_computed(self, block_hash, program_states) + } + fn poll_next_state(self, _cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> { Ok((Poll::Pending, self.into())) } @@ -467,6 +485,14 @@ impl StateHandler for ValidatorState { fn process_injected_transaction(self, tx: SignedInjectedTransaction) -> Result { delegate_call!(self => process_injected_transaction(tx)) } + + fn process_canonical_events_computed( + self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result { + delegate_call!(self => process_canonical_events_computed(block_hash, program_states)) + } } struct DefaultProcessing; @@ -497,6 +523,18 @@ impl DefaultProcessing { Ok(s) } + fn canonical_events_computed( + s: impl Into, + block_hash: H256, + _program_states: ProgramStates, + ) -> Result { + let mut s = s.into(); + s.warning(format!( + "unexpected canonical events computed for block: {block_hash}" + )); + Ok(s) + } + fn promise_for_signing( s: impl Into, promise: Promise, diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 8639d80084b..57b7f8e7e1f 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -27,11 +27,12 @@ use crate::{ use anyhow::{Result, anyhow}; use derive_more::{Debug, Display}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, - gear::BatchCommitment, injected::Promise, network::ValidatorMessage, + Announce, HashOf, ProgramStates, PromisePolicy, SimpleBlockData, ValidatorsVec, + db::BlockMetaStorageRO, gear::BatchCommitment, injected::Promise, network::ValidatorMessage, }; use ethexe_service_utils::Timer; use futures::{FutureExt, future::BoxFuture}; +use gprimitives::H256; use gsigner::secp256k1::Secp256k1SignerExt; use std::task::{Context, Poll}; @@ -53,6 +54,19 @@ enum State { #[debug(skip)] timer: Option, }, + /// Waiting for canonical-only compute to return ProgramStates. + WaitingCanonicalComputed { + parent_announce: HashOf, + }, + /// Collecting TXs against post-canonical ProgramStates. + /// Poll timer gives TXs time to arrive before building the announce. + ReadyForTxCollection { + parent_announce: HashOf, + #[debug(skip)] + program_states: ProgramStates, + #[debug(skip)] + poll_timer: Timer, + }, WaitingAnnounceComputed(HashOf), AggregateBatchCommitment { #[debug(skip)] @@ -129,6 +143,34 @@ impl StateHandler for Producer { } } + fn process_canonical_events_computed( + mut self, + block_hash: H256, + program_states: ProgramStates, + ) -> Result { + match &self.state { + State::WaitingCanonicalComputed { parent_announce } + if block_hash == self.block.hash => + { + let parent = *parent_announce; + + // Enter TX collection window. The poll timer gives TXs + // time to arrive before building the announce. + let mut poll_timer = Timer::new("tx-collection poll", self.ctx.core.producer_delay); + poll_timer.start(()); + + self.state = State::ReadyForTxCollection { + parent_announce: parent, + program_states, + poll_timer, + }; + + Ok(self.into()) + } + _ => DefaultProcessing::canonical_events_computed(self, block_hash, program_states), + } + } + fn poll_next_state(mut self, cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> { match &mut self.state { State::Delay { timer: Some(timer) } => { @@ -137,6 +179,27 @@ impl StateHandler for Producer { return Ok((Poll::Ready(()), state)); } } + State::ReadyForTxCollection { poll_timer, .. } => { + if poll_timer.poll_unpin(cx).is_ready() { + // Timer fired — collect TXs and build announce. + // We use mem::replace to move ProgramStates out of self.state. + // The Delay { timer: None } placeholder is a dead state (never fires). + // If build_announce_with_states errors, the `?` propagates and the + // producer is dropped, so the placeholder is never observed. + let State::ReadyForTxCollection { + parent_announce, + program_states, + .. + } = std::mem::replace(&mut self.state, State::Delay { timer: None }) + else { + unreachable!() + }; + + let state = + self.build_announce_with_states(parent_announce, &program_states)?; + return Ok((Poll::Ready(()), state)); + } + } State::AggregateBatchCommitment { future } => match future.poll_unpin(cx) { Poll::Ready(Ok(Some(batch))) => { tracing::debug!(batch.block_hash = %batch.block_hash, "Batch commitment aggregated, switch to Coordinator"); @@ -184,6 +247,7 @@ impl Producer { .into()) } + /// Phase 1: Request canonical-only compute to get fresh ProgramStates for TX validation. fn produce_announce(mut self) -> Result { if !self.ctx.core.db.block_meta(self.block.hash).prepared { return Err(anyhow!( @@ -197,11 +261,31 @@ impl Producer { self.ctx.core.commitment_delay_limit, )?; + // Phase 1: ask compute to run canonical events only (no TXs). + // The result (ProgramStates) arrives via process_canonical_events_computed. + self.ctx.output(ConsensusEvent::ComputeCanonicalEvents( + self.block.hash, + parent, + self.ctx.core.block_gas_limit, + )); + self.state = State::WaitingCanonicalComputed { + parent_announce: parent, + }; + + Ok(self.into()) + } + + /// Phase 2: Select TXs using post-canonical ProgramStates, build and gossip announce. + fn build_announce_with_states( + mut self, + parent: HashOf, + program_states: &ProgramStates, + ) -> Result { let injected_transactions = self .ctx .core .injected_pool - .select_for_announce(self.block, parent)?; + .select_for_announce_with_states(self.block, parent, program_states)?; let announce = Announce { block_hash: self.block.hash, @@ -213,14 +297,10 @@ impl Producer { let (announce_hash, newly_included) = self.ctx.core.db.include_announce(announce.clone())?; if !newly_included { - // This can happen in case of abuse from rpc - the same eth block is announced multiple times, - // then the same announce is created multiple times, and include_announce would return already included. - // In this case we just go to initial state, without publishing anything and computing announce again. self.warning(format!( "Announce created {announce:?} is already included at {}", self.block.hash )); - return Initial::create(self.ctx); } @@ -464,8 +544,69 @@ mod tests { // TODO: test that zero timer works as expected + #[tokio::test] + #[ntest::timeout(3000)] + async fn new_head_during_canonical_compute() { + let (ctx, keys, _) = mock_validator_context(); + let validators = nonempty![ctx.core.pub_key.to_address(), keys[0].to_address()].into(); + let chain = BlockChain::mock(1).setup(&ctx.core.db); + let block = chain.blocks[1].to_simple(); + + let state = Producer::create(ctx, block, validators).unwrap(); + + // Wait for timer to fire → ComputeCanonicalEvents + let (state, event) = state.wait_for_event().await.unwrap(); + assert!(event.is_compute_canonical_events()); + + // Now in WaitingCanonicalComputed. Send a new head. + let new_block = SimpleBlockData::mock(()); + let state = state.process_new_head(new_block).unwrap(); + + // Should transition to Initial (canonical compute discarded) + assert!( + state.is_initial(), + "new_head during WaitingCanonicalComputed must go to Initial, got {state}" + ); + } + + #[tokio::test] + #[ntest::timeout(3000)] + async fn new_head_during_tx_collection() { + let (ctx, keys, _) = mock_validator_context(); + let validators = nonempty![ctx.core.pub_key.to_address(), keys[0].to_address()].into(); + let chain = BlockChain::mock(1).setup(&ctx.core.db); + let block = chain.blocks[1].to_simple(); + + let state = Producer::create(ctx, block, validators).unwrap(); + + // Wait for timer to fire → ComputeCanonicalEvents + let (state, event) = state.wait_for_event().await.unwrap(); + let (block_hash, _, _) = event.unwrap_compute_canonical_events(); + + // Deliver canonical events → enters ReadyForTxCollection + let state = state + .process_canonical_events_computed(block_hash, ethexe_common::ProgramStates::new()) + .unwrap(); + assert!(state.is_producer()); + + // Now in ReadyForTxCollection. Send a new head before the poll timer fires. + let new_block = SimpleBlockData::mock(()); + let state = state.process_new_head(new_block).unwrap(); + + // Should transition to Initial (TX collection discarded) + assert!( + state.is_initial(), + "new_head during ReadyForTxCollection must go to Initial, got {state}" + ); + } + #[async_trait] trait ProducerExt: Sized { + /// Skip the initial producer delay and complete the full two-phase announce production flow: + /// 1. produce_announce is triggered, emitting ComputeCanonicalEvents. + /// 2. process_canonical_events_computed is called, transitioning to ReadyForTxCollection. + /// 3. The poll timer fires, triggering build_announce_with_states, + /// which emits PublishMessage and ComputeAnnounce. async fn skip_timer(self) -> Result<(Self, HashOf)>; } @@ -487,13 +628,38 @@ mod tests { let state = ValidatorState::from(producer); + // Phase 1: timer fires → ComputeCanonicalEvents + let (state, event) = state.wait_for_event().await?; + assert!(state.is_producer(), "Expected producer state, got {state}"); + assert!( + event.is_compute_canonical_events(), + "Expected ComputeCanonicalEvents, got {event:?}" + ); + + // Extract block_hash from the event before consuming state + let (block_hash, _, _) = event.unwrap_compute_canonical_events(); + + // Phase 2: deliver empty ProgramStates → enters ReadyForTxCollection + let state = state.process_canonical_events_computed( + block_hash, + ethexe_common::ProgramStates::new(), + )?; + assert!(state.is_producer(), "Expected producer state, got {state}"); + + // Phase 3: poll timer fires → builds announce → PublishMessage + ComputeAnnounce let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - assert!(event.is_publish_message()); + assert!( + event.is_publish_message(), + "Expected PublishMessage, got {event:?}" + ); let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - assert!(event.is_compute_announce()); + assert!( + event.is_compute_announce(), + "Expected ComputeAnnounce, got {event:?}" + ); Ok((state, event.unwrap_compute_announce().0.to_hash())) } diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 2c2a550238c..749142b90df 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -22,7 +22,7 @@ use super::{ }; use crate::{ ConsensusEvent, - announces::{self, AnnounceStatus}, + announces::{self, AnnounceRejectionReason, AnnounceStatus}, validator::participant::Participant, }; use anyhow::Result; @@ -94,8 +94,39 @@ impl StateHandler for Subordinate { if verified_announce.address() == self.producer && verified_announce.data().block_hash == self.block.hash => { - let (announce, _pub_key) = verified_announce.into_parts(); - self.send_announce_for_computation(announce) + let (announce, _pub_key) = verified_announce.clone().into_parts(); + match announces::accept_announce(&self.ctx.core.db, announce.clone())? { + AnnounceStatus::Accepted(announce_hash) => { + self.ctx + .output(ConsensusEvent::AnnounceAccepted(announce_hash)); + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce, + PromisePolicy::Disabled, + )); + self.state = State::WaitingAnnounceComputed { announce_hash }; + Ok(self.into()) + } + AnnounceStatus::Rejected { + reason: AnnounceRejectionReason::UnknownParent { .. }, + .. + } => { + // Parent not yet included — defer to pending instead of rejecting. + // Gossip reordering can cause the child to arrive before the parent. + // The announce will be retried when Subordinate::create runs next block, + // or recovered via collect_not_committed_predecessors. + tracing::trace!("Announce parent not yet included, deferring to pending"); + self.ctx.pending(verified_announce); + Ok(self.into()) + } + AnnounceStatus::Rejected { announce, reason } => { + self.ctx + .output(ConsensusEvent::AnnounceRejected(announce.to_hash())); + self.warning(format!( + "Received announce {announce:?} is rejected: {reason:?}" + )); + Initial::create(self.ctx) + } + } } _ => DefaultProcessing::announce_from_producer(self, verified_announce), } @@ -443,7 +474,7 @@ mod tests { } #[test] - fn reject_announce_from_producer() { + fn defer_announce_with_unknown_parent() { let (ctx, pub_keys, _) = mock_validator_context(); let producer = pub_keys[0]; let chain = BlockChain::mock(1).setup(&ctx.core.db); @@ -455,18 +486,15 @@ mod tests { assert!(s.is_subordinate(), "got {s:?}"); assert_eq!(s.context().output, vec![]); - // After receiving invalid announce - subordinate rejects it and switches to initial state. - let s = s.process_announce(announce.clone()).unwrap(); - assert!(s.is_initial(), "got {s:?}"); - assert_eq!(s.context().output.len(), 2); + // Announce with unknown parent is deferred to pending (not rejected), + // supporting gossip reordering where a child arrives before its parent. + let s = s.process_announce(announce).unwrap(); + assert!(s.is_subordinate(), "got {s:?}"); + assert_eq!(s.context().output.len(), 0); assert_eq!( - s.context().output[0], - ConsensusEvent::AnnounceRejected(announce.data().to_hash()) - ); - assert!( - s.context().output[1].is_warning(), - "got {:?}", - s.context().output[1] + s.context().pending_events.len(), + 1, + "Announce should be saved to pending for later replay" ); } } diff --git a/ethexe/consensus/src/validator/tx_pool.rs b/ethexe/consensus/src/validator/tx_pool.rs index f15e240fb45..f2252902b9a 100644 --- a/ethexe/consensus/src/validator/tx_pool.rs +++ b/ethexe/consensus/src/validator/tx_pool.rs @@ -19,7 +19,7 @@ use crate::tx_validation::{TxValidity, TxValidityChecker}; use anyhow::Result; use ethexe_common::{ - Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, SimpleBlockData, + Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, ProgramStates, SimpleBlockData, db::{ AnnounceStorageRO, CodesStorageRO, GlobalsStorageRO, InjectedStorageRW, OnChainStorageRO, }, @@ -72,6 +72,8 @@ where } /// Returns the injected transactions that are valid and can be included to announce. + /// Used by accept_announce validation and tests. The producer uses select_for_announce_with_states. + #[allow(dead_code)] pub fn select_for_announce( &mut self, block: SimpleBlockData, @@ -81,7 +83,32 @@ where let tx_checker = TxValidityChecker::new_for_announce(self.db.clone(), block, parent_announce)?; + self.select_with_checker(block, &tx_checker) + } + + /// Returns injected transactions validated against provided ProgramStates (from canonical compute). + pub fn select_for_announce_with_states( + &mut self, + block: SimpleBlockData, + parent_announce: HashOf, + program_states: &ProgramStates, + ) -> Result> { + tracing::trace!(block = ?block.hash, "start collecting injected transactions with post-canonical states"); + + let tx_checker = TxValidityChecker::new_with_states( + self.db.clone(), + block, + parent_announce, + program_states, + )?; + self.select_with_checker(block, &tx_checker) + } + fn select_with_checker( + &mut self, + block: SimpleBlockData, + tx_checker: &TxValidityChecker<'_, DB>, + ) -> Result> { let mut touched_programs = crate::utils::block_touched_programs(&self.db, block.hash)?; if touched_programs.len() > MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE as usize { tracing::error!( diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 64f8330a8ef..9fe9ed8e2d6 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -610,6 +610,9 @@ impl Service { ComputeEvent::Promise(promise, announce_hash) => { consensus.receive_promise_for_signing(promise, announce_hash)?; } + ComputeEvent::CanonicalEventsComputed(block_hash, program_states) => { + consensus.receive_canonical_events_computed(block_hash, program_states)?; + } }, Event::Network(event) => { let Some(_) = network.as_mut() else { @@ -741,6 +744,13 @@ impl Service { ConsensusEvent::AnnounceAccepted(_) | ConsensusEvent::AnnounceRejected(_) => { // TODO #4940: consider to publish network message } + ConsensusEvent::ComputeCanonicalEvents( + block_hash, + parent_announce, + gas_allowance, + ) => { + compute.compute_canonical_events(block_hash, parent_announce, gas_allowance) + } }, Event::Prometheus(event) => match event { PrometheusEvent::CollectMetrics { libp2p_metrics } => {