From 952380049e526533859e94ff888d78f7dbc6d365 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Fri, 27 Mar 2026 20:44:19 +0100 Subject: [PATCH 01/12] initial --- ethexe/consensus/src/announces.rs | 111 ++++++++++- ethexe/consensus/src/validator/producer.rs | 105 +++++++++-- ethexe/consensus/src/validator/subordinate.rs | 87 +++++++-- ethexe/service/src/tests/mod.rs | 174 +++++++++--------- ethexe/service/src/tests/utils/events.rs | 9 + 5 files changed, 358 insertions(+), 128 deletions(-) diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 337a086c95d..1cd040d9760 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -600,15 +600,33 @@ pub fn best_parent_announce( db: &impl DBAnnouncesExt, block_hash: H256, commitment_delay_limit: u32, -) -> Result> { +) -> Result<(HashOf, Announce)> { + let announces = db + .block_meta(block_hash) + .announces + .ok_or_else(|| anyhow!("announces not found for block {block_hash}"))?; + // We do not take announces directly from parent block, // because some of them may be expired at `block_hash`, // so we take parents of all announces from `block_hash`, // to be sure that we take only not expired parent announces. - let parent_announces = - db.announces_parents(db.block_meta(block_hash).announces.into_iter().flatten())?; + let parent_announces = db.announces_parents(announces.clone().into_iter())?; + + let best_announce_hash = best_announce(db, parent_announces, commitment_delay_limit)?; + + for announce_hash in announces { + let announce = db + .announce(announce_hash) + .ok_or_else(|| anyhow!("announce({announce_hash}) not found in db"))?; + + if announce.parent == best_announce_hash { + return Ok((best_announce_hash, announce)); + } + } - best_announce(db, parent_announces, commitment_delay_limit) + unreachable!( + "Best announce {best_announce_hash} must be parent of at least one announce in block {block_hash}" + ); } /// Returns announce hash, which is supposed to be best among provided announces. @@ -656,7 +674,90 @@ pub fn best_announce( } } - Ok(best_announce_hash) + if let AnnounceSiblingsOutcomeStatus::OutcomeIsSame { + sibling_announce_hash, + sibling_announce, + } = check_announce_sibling_outcome(db, best_announce_hash)? + && sibling_announce.is_base() + { + // if sibling has same outcome and it's base, then better to use base + Ok(sibling_announce_hash) + } else { + Ok(best_announce_hash) + } +} + +pub enum AnnounceSiblingsOutcomeStatus { + NotFound, + NotComputed, + OutcomeIsDifferent, + OutcomeIsSame { + sibling_announce_hash: HashOf, + sibling_announce: Announce, + }, +} + +/// Siblings of announce is announces with the same parent +pub fn check_announce_sibling_outcome( + db: &impl DBAnnouncesExt, + announce_hash: HashOf, +) -> Result { + let Some((sibling_announce_hash, sibling_announce)) = announce_sibling(db, announce_hash)? + else { + return Ok(AnnounceSiblingsOutcomeStatus::NotFound); + }; + + if !db.announce_meta(announce_hash).computed + || !db.announce_meta(sibling_announce_hash).computed + { + return Ok(AnnounceSiblingsOutcomeStatus::NotComputed); + } + + let announce_outcome = db + .announce_outcome(announce_hash) + .ok_or_else(|| anyhow!("outcome not found for computed announce {announce_hash:?}"))?; + let sibling_outcome = db.announce_outcome(sibling_announce_hash).ok_or_else(|| { + anyhow!("outcome not found for computed sibling announce {sibling_announce_hash:?}") + })?; + + if announce_outcome == sibling_outcome { + Ok(AnnounceSiblingsOutcomeStatus::OutcomeIsSame { + sibling_announce_hash, + sibling_announce, + }) + } else { + Ok(AnnounceSiblingsOutcomeStatus::OutcomeIsDifferent) + } +} + +pub fn announce_sibling( + db: &impl DBAnnouncesExt, + announce_hash: HashOf, +) -> Result, Announce)>> { + let announce = db + .announce(announce_hash) + .ok_or_else(|| anyhow!("announce({announce_hash}) not found"))?; + + let neighbors = db + .block_meta(announce.block_hash) + .announces + .ok_or_else(|| anyhow!("announces not found for block({})", announce.block_hash))?; + + for neighbor_hash in neighbors { + if neighbor_hash == announce_hash { + continue; + } + + let neighbor_announce = db + .announce(neighbor_hash) + .ok_or_else(|| anyhow!("announce({neighbor_hash}) not found"))?; + + if neighbor_announce.parent == announce.parent { + return Ok(Some((neighbor_hash, neighbor_announce))); + } + } + + Ok(None) } #[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 8639d80084b..55965d64489 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -21,7 +21,7 @@ use super::{ }; use crate::{ ConsensusEvent, - announces::{self, DBAnnouncesExt}, + announces::{self, AnnounceSiblingsOutcomeStatus, DBAnnouncesExt}, validator::DefaultProcessing, }; use anyhow::{Result, anyhow}; @@ -52,6 +52,8 @@ enum State { Delay { #[debug(skip)] timer: Option, + best_parent_announce_hash: HashOf, + child_announce_hash: Option>, }, WaitingAnnounceComputed(HashOf), AggregateBatchCommitment { @@ -77,17 +79,48 @@ impl StateHandler for Producer { mut self, announce_hash: HashOf, ) -> Result { - match &self.state { - State::WaitingAnnounceComputed(expected) if *expected == announce_hash => { - // Aggregate commitment for the block and use `announce_hash` as head for chain commitment. - // `announce_hash` is computed and included in the db already, so it's safe to use it. + match self.state { + State::Delay { + timer, + child_announce_hash: Some(child_announce_hash), + best_parent_announce_hash, + } if child_announce_hash == announce_hash => { + let timer_is_none = timer.is_none(); + self.state = State::Delay { + timer, + best_parent_announce_hash, + child_announce_hash: None, + }; + + if timer_is_none { + self.produce_announce(best_parent_announce_hash) + } else { + Ok(self.into()) + } + } + State::WaitingAnnounceComputed(expected) if expected == announce_hash => { + // use base sibling announce as best if it has the same outcome as current announce, otherwise use current announce as best. + let best_announce_hash = if let AnnounceSiblingsOutcomeStatus::OutcomeIsSame { + sibling_announce_hash, + sibling_announce, + } = + announces::check_announce_sibling_outcome(&self.ctx.core.db, announce_hash)? + && sibling_announce.is_base() + { + sibling_announce_hash + } else { + announce_hash + }; + + // Aggregate commitment for the block and use `best_announce_hash` as head for chain commitment. + // `best_announce_hash` is computed and included in the db already, so it's safe to use it. self.state = State::AggregateBatchCommitment { future: self .ctx .core .batch_manager .clone() - .create_batch_commitment(self.block, announce_hash) + .create_batch_commitment(self.block, best_announce_hash) .boxed(), }; @@ -131,9 +164,23 @@ impl StateHandler for Producer { fn poll_next_state(mut self, cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> { match &mut self.state { - State::Delay { timer: Some(timer) } => { + State::Delay { + timer: Some(timer), + child_announce_hash, + best_parent_announce_hash, + } => { if timer.poll_unpin(cx).is_ready() { - let state = self.produce_announce()?; + let state = if child_announce_hash.is_none() { + let announce_hash = *best_parent_announce_hash; + self.produce_announce(announce_hash)? + } else { + self.state = State::Delay { + timer: None, + best_parent_announce_hash: *best_parent_announce_hash, + child_announce_hash: *child_announce_hash, + }; + self.into() + }; return Ok((Poll::Ready(()), state)); } } @@ -175,28 +222,38 @@ impl Producer { ctx.pending_events.clear(); + let (best_parent_announce_hash, child_announce) = announces::best_parent_announce( + &ctx.core.db, + block.hash, + ctx.core.commitment_delay_limit, + )?; + + let child_announce_hash = Some(child_announce.to_hash()); + ctx.output(ConsensusEvent::ComputeAnnounce( + child_announce, + PromisePolicy::Disabled, + )); + Ok(Self { ctx, block, validators, - state: State::Delay { timer: Some(timer) }, + state: State::Delay { + timer: Some(timer), + best_parent_announce_hash, + child_announce_hash, + }, } .into()) } - fn produce_announce(mut self) -> Result { + fn produce_announce(mut self, parent: HashOf) -> Result { if !self.ctx.core.db.block_meta(self.block.hash).prepared { return Err(anyhow!( "Impossible, block must be prepared before creating announce" )); } - let parent = announces::best_parent_announce( - &self.ctx.core.db, - self.block.hash, - self.ctx.core.commitment_delay_limit, - )?; - let injected_transactions = self .ctx .core @@ -268,7 +325,7 @@ mod tests { async fn create() { let (mut ctx, keys, _) = mock_validator_context(); let validators = nonempty![ctx.core.pub_key.to_address(), keys[0].to_address()]; - let block = SimpleBlockData::mock(()); + let block = BlockChain::mock(2).setup(&ctx.core.db).blocks[2].to_simple(); ctx.pending(PendingEvent::ValidationRequest( ctx.core.signer.mock_verified_data(keys[0], ()), @@ -481,16 +538,28 @@ mod tests { let producer = self.unwrap_producer(); assert!( producer.state.is_delay(), - "Works only for waiting for codes state, got {:?}", + "Works only for delay state, got {:?}", producer.state ); let state = ValidatorState::from(producer); + // Base announce computation + let (state, event) = state.wait_for_event().await?; + assert!(state.is_producer(), "Expected producer state, got {state}"); + let ConsensusEvent::ComputeAnnounce(announce, PromisePolicy::Disabled) = event else { + panic!("Expected ComputeAnnounce event announces disabled, got {event:?}"); + }; + + // Set announce as computed + let state = state.process_computed_announce(announce.to_hash())?; + + // Announce message publication let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); assert!(event.is_publish_message()); + // Announce computation let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); assert!(event.is_compute_announce()); diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 2c2a550238c..c73621f7321 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -54,7 +54,10 @@ pub struct Subordinate { #[derive(Debug, PartialEq, Eq)] enum State { WaitingForAnnounce, - WaitingAnnounceComputed { announce_hash: HashOf }, + WaitingAnnounceComputed { + announce_hash: Option>, + sibling_hash: Option>, + }, } impl StateHandler for Subordinate { @@ -71,20 +74,38 @@ impl StateHandler for Subordinate { } fn process_computed_announce( - self, + mut self, computed_announce_hash: HashOf, ) -> Result { - match &self.state { - State::WaitingAnnounceComputed { announce_hash } - if *announce_hash == computed_announce_hash => - { + if let State::WaitingAnnounceComputed { + mut announce_hash, + mut sibling_hash, + } = self.state + { + if announce_hash == Some(computed_announce_hash) { + announce_hash = None; + } else if sibling_hash == Some(computed_announce_hash) { + sibling_hash = None; + } else { + return DefaultProcessing::computed_announce(self, computed_announce_hash); + } + + if announce_hash.is_none() && sibling_hash.is_none() { if self.is_validator { - Participant::create(self.ctx, self.block, self.producer) + return Participant::create(self.ctx, self.block, self.producer); } else { - Initial::create(self.ctx) + return Initial::create(self.ctx); } } - _ => DefaultProcessing::computed_announce(self, computed_announce_hash), + + self.state = State::WaitingAnnounceComputed { + announce_hash, + sibling_hash, + }; + + Ok(self.into()) + } else { + DefaultProcessing::computed_announce(self, computed_announce_hash) } } @@ -95,7 +116,7 @@ impl StateHandler for Subordinate { && verified_announce.data().block_hash == self.block.hash => { let (announce, _pub_key) = verified_announce.into_parts(); - self.send_announce_for_computation(announce) + self.send_announces_for_computation(announce) } _ => DefaultProcessing::announce_from_producer(self, verified_announce), } @@ -162,22 +183,39 @@ impl Subordinate { }; if let Some(announce) = earlier_announce { - state.send_announce_for_computation(announce) + state.send_announces_for_computation(announce) } else { Ok(state.into()) } } - fn send_announce_for_computation(mut self, announce: Announce) -> Result { + fn send_announces_for_computation(mut self, announce: Announce) -> Result { match announces::accept_announce(&self.ctx.core.db, announce.clone())? { AnnounceStatus::Accepted(announce_hash) => { + let sibling = announces::announce_sibling(&self.ctx.core.db, announce_hash)?; + self.ctx .output(ConsensusEvent::AnnounceAccepted(announce_hash)); + + let sibling_hash = if let Some((hash, announce)) = sibling { + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce, + PromisePolicy::Disabled, + )); + Some(hash) + } else { + None + }; + self.ctx.output(ConsensusEvent::ComputeAnnounce( announce, PromisePolicy::Disabled, )); - self.state = State::WaitingAnnounceComputed { announce_hash }; + + self.state = State::WaitingAnnounceComputed { + announce_hash: Some(announce_hash), + sibling_hash, + }; Ok(self.into()) } @@ -219,6 +257,7 @@ mod tests { let chain = BlockChain::mock(1).setup(&ctx.core.db); let block = chain.blocks[1].to_simple(); let parent_announce_hash = chain.block_top_announce_hash(0); + let base_announce = chain.block_top_announce(1).announce.clone(); let announce1 = ctx .core .signer @@ -237,6 +276,7 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce1.data().to_hash()), + ConsensusEvent::ComputeAnnounce(base_announce, PromisePolicy::Disabled), ConsensusEvent::ComputeAnnounce(announce1.data().clone(), PromisePolicy::Disabled) ] ); @@ -276,6 +316,7 @@ mod tests { let alice = keys[1]; let chain = BlockChain::mock(1).setup(&ctx.core.db); let block = chain.blocks[1].to_simple(); + let base_announce = chain.block_top_announce(1).announce.clone(); let announce: VerifiedAnnounce = ctx .core .signer @@ -297,6 +338,7 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), + ConsensusEvent::ComputeAnnounce(base_announce, PromisePolicy::Disabled), ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); @@ -319,6 +361,8 @@ mod tests { assert!(s.is_subordinate(), "got {s:?}"); assert_eq!(s.context().output, vec![]); + let base_announce = chain.block_top_announce(1).announce.clone(); + // After receiving valid announce - subordinate sends it to computation. let s = s.process_announce(announce.clone()).unwrap(); assert!(s.is_subordinate(), "got {s:?}"); @@ -326,11 +370,15 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), + ConsensusEvent::ComputeAnnounce(base_announce.clone(), PromisePolicy::Disabled), ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); - // After announce is computed, subordinate switches to participant state. + // After announces are computed, subordinate switches to participant state. + let s = s + .process_computed_announce(base_announce.to_hash()) + .unwrap(); let s = s .process_computed_announce(announce.data().to_hash()) .unwrap(); @@ -339,6 +387,7 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), + ConsensusEvent::ComputeAnnounce(base_announce.clone(), PromisePolicy::Disabled), ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); @@ -351,6 +400,7 @@ mod tests { let chain = BlockChain::mock(1).setup(&ctx.core.db); let block = chain.blocks[1].to_simple(); let parent_announce_hash = chain.block_top_announce_hash(0); + let base_announce = chain.block_top_announce(1).announce.clone(); let announce = ctx .core .signer @@ -368,11 +418,13 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), + ConsensusEvent::ComputeAnnounce(base_announce.clone(), PromisePolicy::Disabled), ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) ] ); - // After announce is computed, not-validator subordinate switches to initial state. + // After both announces are computed, not-validator subordinate switches to initial state. + let s = s.process_computed_announce(base_announce.to_hash()).unwrap(); let s = s .process_computed_announce(announce.data().to_hash()) .unwrap(); @@ -384,8 +436,10 @@ mod tests { let (mut ctx, keys, _) = mock_validator_context(); let producer = keys[0]; let alice = keys[1]; - let block = BlockChain::mock(1).setup(&ctx.core.db).blocks[1].to_simple(); + let chain = BlockChain::mock(1).setup(&ctx.core.db); + let block = chain.blocks[1].to_simple(); let parent_announce_hash = ctx.core.db.top_announce_hash(block.header.parent_hash); + let base_announce = chain.block_top_announce(1).announce.clone(); let producer_announce = ctx .core .signer @@ -403,6 +457,7 @@ mod tests { s.context().output, vec![ ConsensusEvent::AnnounceAccepted(producer_announce.data().to_hash()), + ConsensusEvent::ComputeAnnounce(base_announce, PromisePolicy::Disabled), ConsensusEvent::ComputeAnnounce( producer_announce.data().clone(), PromisePolicy::Disabled diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 215bff62098..6c10d40aa6c 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -151,7 +151,7 @@ async fn basics() { } #[tokio::test] -#[ntest::timeout(30_000)] +#[ntest::timeout(60_000)] async fn invalid_code() { init_logger(); @@ -163,11 +163,11 @@ async fn invalid_code() { node.start_service().await; let wasm_binary = [1; 10]; // Invalid WASM binary + let upload = env.upload_code(&wasm_binary).await.unwrap(); let res = env - .upload_code(&wasm_binary) - .await - .unwrap() - .wait_for() + .ethereum + .router() + .wait_for_code_validation(upload.code_id) .await .unwrap(); assert!(!res.valid); @@ -1759,7 +1759,7 @@ async fn fast_sync() { } let latest_block = env.latest_block().await.hash; - alice.events().find_announce_computed(latest_block).await; + alice.events().find_block_computed_twice(latest_block).await; log::info!("Starting Bob (fast-sync)"); let mut bob = env.new_node(NodeConfig::named("Bob").fast_sync()).await; @@ -1783,7 +1783,7 @@ async fn fast_sync() { } let latest_block = env.latest_block().await.hash; - alice.events().find_announce_computed(latest_block).await; + alice.events().find_block_computed_twice(latest_block).await; bob.events().find_announce_computed(latest_block).await; log::info!("📗 Stopping Bob"); @@ -1814,7 +1814,7 @@ async fn fast_sync() { env.skip_blocks(100).await; let latest_block = env.latest_block().await.hash; - alice.events().find_announce_computed(latest_block).await; + alice.events().find_block_computed_twice(latest_block).await; log::info!("📗 Starting Bob again to check how it handles partially empty database"); bob.start_service().await; @@ -1830,7 +1830,7 @@ async fn fast_sync() { } let latest_block = env.latest_block().await.hash; - alice.events().find_announce_computed(latest_block).await; + alice.events().find_block_computed_twice(latest_block).await; bob.events().find_announce_computed(latest_block).await; assert_chain( @@ -2051,7 +2051,7 @@ async fn execution_with_canonical_events_quarantine() { log::info!("📗 waiting announce for block {latest_block} computed"); validator .events() - .find_announce_computed(latest_block) + .find_block_computed_twice(latest_block) .await; // create a receiver without history so we don't face old `BlockSynced` in further for-loop @@ -2097,7 +2097,7 @@ async fn execution_with_canonical_events_quarantine() { assert!(!check_for_pong(block_hash), "PONG received too early"); - receiver.find_announce_computed(block_hash).await; + receiver.find_block_computed_twice(block_hash).await; env.force_new_block().await; } @@ -2953,7 +2953,7 @@ async fn announces_conflicts() { let latest_block = env.latest_block().await.hash; let mut latest_computed_announce_hash = HashOf::zero(); for receiver in &mut receivers { - let announce_hash = receiver.find_announce_computed(latest_block).await; + let announce_hash = receiver.find_block_computed_twice(latest_block).await; assert!( latest_computed_announce_hash == HashOf::zero() || latest_computed_announce_hash == announce_hash, @@ -3287,11 +3287,8 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { // Wait until both stops processing let latest_block = env.latest_block().await.hash; - let latest_announce_hash = bob.events().find_announce_computed(latest_block).await; - assert_eq!( - alice.events().find_announce_computed(latest_block).await, - latest_announce_hash - ); + bob.events().find_announce_computed(latest_block).await; + alice.events().find_block_computed_twice(latest_block).await; log::info!("📗 Stopping Bob"); bob.stop_service().await; @@ -3306,7 +3303,7 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { // Wait until Alice stop processing let latest_block = env.latest_block().await.hash; - alice.events().find_announce_computed(latest_block).await; + alice.events().find_block_computed_twice(latest_block).await; log::info!("📗 Stopping Alice"); alice.stop_service().await; @@ -3367,80 +3364,79 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { wait_for.wait_for().await.unwrap(); } - log::info!("📗 Waiting for two rejected announces from Bob"); + log::info!("📗 Waiting for two accepted announces from Bob"); for _ in 0..2 { - bob.events().find_announce_rejected(AnnounceId::Any).await; - } - - log::info!("📗 Sending third PING message, one more attempt for Bob to catch up Alice"); - { - let receiver = env.new_observer_events(); - let pending = env - .ethereum - .mirror(ping_id) - .send_message_pending(b"PING", 0) - .await - .unwrap(); - env.force_new_block().await; - let wait_for = WaitForReplyTo::from_raw_parts( - receiver, - pending.try_get_message_send_receipt().await.unwrap().1, - ); - - // Waiting until Alice is ready for commitment1 - wait_signal_receiver.recv().await.unwrap(); - - // Force new block, so that commitment1 would skip this block - env.force_new_block().await; - - // Send signal to make commitment1 and wait until it's sent - commit_signal_sender.send(()).unwrap(); - wait_signal_receiver.recv().await.unwrap(); - - // Wait until Alice is ready for next commitment2 - wait_signal_receiver.recv().await.unwrap(); - - // Force new block to commit commitment1 - // if commitment_delay_limit == 3 => commitment1 would fail because contains expired announces - // if commitment_delay_limit == 5 => commitment1 would succeed - env.force_new_block().await; - - if commitment_delay_limit == 3 { - // Waiting until Alice is ready for commitment2 - wait_signal_receiver.recv().await.unwrap(); - - // Send signal to make commitment2 and wait until it's sent - commit_signal_sender.send(()).unwrap(); - wait_signal_receiver.recv().await.unwrap(); - - // Force new block to commit commitment2, succeed - env.force_new_block().await; - } else if commitment_delay_limit == 5 { - // commitment1 already committed, so Alice would not commit commitment2, because it's empty - } else { - unreachable!(); - } - - // Now commitment1 or commitment2 must be applied in the forced blocks - wait_for.wait_for().await.unwrap(); + bob.events().find_announce_accepted(AnnounceId::Any).await; } - let latest_block = env.latest_block().await.hash; - let latest_announce_hash = alice.events().find_announce_computed(latest_block).await; - - if commitment_delay_limit == 3 { - log::info!("📗 Bob accepts announce from Alice at last"); - bob.events() - .find_announce_accepted(latest_announce_hash) - .await; - } else if commitment_delay_limit == 5 { - log::info!("📗 Bob still rejects announce from Alice"); - bob.events() - .find_announce_rejected(latest_announce_hash) - .await; - } else { - unreachable!(); - } + // TODO: after changes in best announce selection, Bob is able to catch up Alice. + // We should create more complex test case to cover the scenario when Bob is not able to catch up Alice. + + // log::info!("📗 Sending third PING message, one more attempt for Bob to catch up Alice"); + // { + // let receiver = env.new_observer_events(); + // let pending = env + // .ethereum + // .mirror(ping_id) + // .send_message_pending(b"PING", 0) + // .await + // .unwrap(); + // env.force_new_block().await; + // let wait_for = WaitForReplyTo::from_raw_parts( + // receiver, + // pending.try_get_message_send_receipt().await.unwrap().1, + // ); + + // // Waiting until Alice is ready for commitment1 + // wait_signal_receiver.recv().await.unwrap(); + + // // Force new block, so that commitment1 would skip this block + // env.force_new_block().await; + + // // Send signal to make commitment1 and wait until it's sent + // commit_signal_sender.send(()).unwrap(); + // wait_signal_receiver.recv().await.unwrap(); + + // // Wait until Alice is ready for next commitment2 + // wait_signal_receiver.recv().await.unwrap(); + + // // Force new block to commit commitment1 + // // if commitment_delay_limit == 3 => commitment1 would fail because contains expired announces + // // if commitment_delay_limit == 5 => commitment1 would succeed + // env.force_new_block().await; + + // if commitment_delay_limit == 3 { + // // Waiting until Alice is ready for commitment2 + // wait_signal_receiver.recv().await.unwrap(); + + // // Send signal to make commitment2 and wait until it's sent + // commit_signal_sender.send(()).unwrap(); + // wait_signal_receiver.recv().await.unwrap(); + + // // Force new block to commit commitment2, succeed + // env.force_new_block().await; + // } else if commitment_delay_limit == 5 { + // // commitment1 already committed, so Alice would not commit commitment2, because it's empty + // } else { + // unreachable!(); + // } + + // // Now commitment1 or commitment2 must be applied in the forced blocks + // wait_for.wait_for().await.unwrap(); + // } + + // let latest_block = env.latest_block().await.hash; + // alice.events().find_block_computed_twice(latest_block).await; + + // if commitment_delay_limit == 3 { + // log::info!("📗 Bob accepts announce from Alice at last"); + // bob.events().find_announce_accepted(latest_block).await; + // } else if commitment_delay_limit == 5 { + // log::info!("📗 Bob still rejects announce from Alice"); + // bob.events().find_announce_rejected(AnnounceId::Any).await; + // } else { + // unreachable!(); + // } } #[tokio::test] diff --git a/ethexe/service/src/tests/utils/events.rs b/ethexe/service/src/tests/utils/events.rs index 795686b9c2d..1befe06a38f 100644 --- a/ethexe/service/src/tests/utils/events.rs +++ b/ethexe/service/src/tests/utils/events.rs @@ -286,6 +286,15 @@ impl TestingEventReceiver { .await } + pub async fn find_block_computed_twice(&mut self, block_hash: H256) -> HashOf { + log::info!("📗 waiting for base and not-base computed for block: {block_hash:?}"); + // First base announce + self.find_announce_computed(block_hash).await; + // Second not-base announce + self.find_announce_computed(block_hash).await + } + + #[allow(unused)] pub async fn find_announce_rejected(&mut self, id: impl Into) -> HashOf { let id = id.into(); log::info!("📗 waiting for announce rejected: {id:?}"); From 2b3ba708471c591807fb2795ef01fb3c2e9a673c Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Tue, 31 Mar 2026 17:57:38 +0200 Subject: [PATCH 02/12] fixes --- CLAUDE.md | 4 ++++ ethexe/consensus/src/validator/subordinate.rs | 4 +++- ethexe/service/src/tests/mod.rs | 5 ++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 353fa4f34da..62f9f937965 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,6 +2,10 @@ This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +## Personal Rules + +If the file `~/.config/claude/gear-rules.md` exists, read it at the start of each session and follow the instructions there. It contains developer-specific preferences that override or supplement the rules below. + ## Project Overview Gear Protocol — a Substrate-based platform for running WebAssembly smart contracts (programs) with an actor-model message-passing architecture. The main network is **Vara**. The repo also contains **ethexe**, a layer that runs Gear programs on Ethereum. diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index c73621f7321..756811dab19 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -424,7 +424,9 @@ mod tests { ); // After both announces are computed, not-validator subordinate switches to initial state. - let s = s.process_computed_announce(base_announce.to_hash()).unwrap(); + let s = s + .process_computed_announce(base_announce.to_hash()) + .unwrap(); let s = s .process_computed_announce(announce.data().to_hash()) .unwrap(); diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 80982b8553a..df2144a0320 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -3286,14 +3286,13 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { wait_for.wait_for().await.unwrap(); } + // TODO: after changes in best announce selection, Bob is able to catch up Alice. + // We should create more complex test case to cover the scenario when Bob is not able to catch up Alice. log::info!("📗 Waiting for two accepted announces from Bob"); for _ in 0..2 { bob.events().find_announce_accepted(AnnounceId::Any).await; } - // TODO: after changes in best announce selection, Bob is able to catch up Alice. - // We should create more complex test case to cover the scenario when Bob is not able to catch up Alice. - // log::info!("📗 Sending third PING message, one more attempt for Bob to catch up Alice"); // { // let receiver = env.new_observer_events(); From 75ad44773f0d528892d6956543ab27b72087a00a Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 2 Apr 2026 13:37:37 +0300 Subject: [PATCH 03/12] Soften `ActiveBackoffPeriod` error --- ethexe/network/src/slots.rs | 98 ++++++++++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 13 deletions(-) diff --git a/ethexe/network/src/slots.rs b/ethexe/network/src/slots.rs index 0d9358fa9ea..21f9604f1a6 100644 --- a/ethexe/network/src/slots.rs +++ b/ethexe/network/src/slots.rs @@ -29,7 +29,7 @@ use crate::utils::{ConnectionMap, NoLimits, PeerAddresses}; use libp2p::{ Multiaddr, PeerId, - core::{Endpoint, transport::PortUse}, + core::{ConnectedPoint, Endpoint, transport::PortUse}, swarm::{ CloseConnection, ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, @@ -413,23 +413,35 @@ impl Behaviour { self.add_connection(peer, connection_id, PeerDirection::Outbound) } - fn remove_connection(&mut self, peer: PeerId, connection_id: ConnectionId) -> bool { + fn remove_connection( + &mut self, + peer: PeerId, + connection_id: ConnectionId, + endpoint: &ConnectedPoint, + ) -> bool { match self.peers.entry(peer) { Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - match entry { + let state = entry.get_mut(); + match state { PeerState::Connected { connections, direction, } => { connections.remove(&connection_id); - if connections.is_empty() { - direction.decrement_metrics(&self.metrics); - *entry = PeerState::JustDisconnected(Instant::now()); + if !connections.is_empty() { + return true; + } + + direction.decrement_metrics(&self.metrics); + + if endpoint.is_dialer() { + entry.remove(); + } else { + *state = PeerState::JustDisconnected(Instant::now()); } } PeerState::JustDisconnected(_) => { - debug_assert!(false, "unexpected {peer} state: {entry:?}") + debug_assert!(false, "unexpected {peer} state: {state:?}") } } @@ -560,11 +572,11 @@ impl NetworkBehaviour for Behaviour { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, - endpoint: _, + endpoint, cause: _, remaining_established: _, }) => { - self.remove_connection(peer_id, connection_id); + self.remove_connection(peer_id, connection_id, endpoint); } FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), @@ -846,7 +858,14 @@ mod tests { behaviour .add_outbound_connection(peer_id, first_connection_id) .unwrap(); - behaviour.remove_connection(peer_id, first_connection_id); + behaviour.remove_connection( + peer_id, + first_connection_id, + &ConnectedPoint::Listener { + local_addr: random_multiaddr(), + send_back_addr: random_multiaddr(), + }, + ); let err = behaviour .add_outbound_connection(peer_id, ConnectionId::new_unchecked(2)) @@ -856,6 +875,45 @@ mod tests { assert_eq!(err, SlotConnectionError::ActiveBackoffPeriod); } + #[tokio::test] + async fn add_outbound_connection_allows_peer_after_dialer_side_disconnect() { + let mut behaviour = Behaviour::new(Config::default()); + + let peer_id = PeerId::random(); + let first_connection_id = ConnectionId::new_unchecked(1); + behaviour + .add_outbound_connection(peer_id, first_connection_id) + .unwrap(); + behaviour.remove_connection( + peer_id, + first_connection_id, + &ConnectedPoint::Dialer { + address: random_multiaddr(), + role_override: Endpoint::Dialer, + port_use: PortUse::New, + }, + ); + + assert!(!behaviour.peers.contains_key(&peer_id)); + + behaviour + .add_outbound_connection(peer_id, ConnectionId::new_unchecked(2)) + .unwrap(); + + let (connections, direction) = behaviour + .peers + .get(&peer_id) + .unwrap() + .unwrap_connected_ref(); + assert_eq!(*direction, PeerDirection::Outbound); + assert_eq!( + *connections, + [ConnectionId::new_unchecked(2)] + .into_iter() + .collect::>() + ); + } + #[tokio::test] async fn add_inbound_connection_rejects_peer_in_backoff_period() { let mut behaviour = Behaviour::new(Config::default()); @@ -865,7 +923,14 @@ mod tests { behaviour .add_inbound_connection(peer_id, first_connection_id) .unwrap(); - behaviour.remove_connection(peer_id, first_connection_id); + behaviour.remove_connection( + peer_id, + first_connection_id, + &ConnectedPoint::Listener { + local_addr: random_multiaddr(), + send_back_addr: random_multiaddr(), + }, + ); let err = behaviour .add_inbound_connection(peer_id, ConnectionId::new_unchecked(2)) @@ -900,7 +965,14 @@ mod tests { behaviour .add_outbound_connection(peer_id, first_connection_id) .unwrap(); - behaviour.remove_connection(peer_id, first_connection_id); + behaviour.remove_connection( + peer_id, + first_connection_id, + &ConnectedPoint::Listener { + local_addr: random_multiaddr(), + send_back_addr: random_multiaddr(), + }, + ); let err = behaviour .add_pending_outbound_connection(peer_id, ConnectionId::new_unchecked(2)) From da0c5ed294bfe32a94c5b005734f1aab134ed8ba Mon Sep 17 00:00:00 2001 From: Arsenii Lyashenko Date: Thu, 2 Apr 2026 13:41:51 +0300 Subject: [PATCH 04/12] Remove `custom_connections_limits` --- .../network/src/custom_connection_limits.rs | 335 ------------------ ethexe/network/src/lib.rs | 25 +- ethexe/network/src/utils.rs | 65 ---- 3 files changed, 3 insertions(+), 422 deletions(-) delete mode 100644 ethexe/network/src/custom_connection_limits.rs diff --git a/ethexe/network/src/custom_connection_limits.rs b/ethexe/network/src/custom_connection_limits.rs deleted file mode 100644 index 41c2517ea53..00000000000 --- a/ethexe/network/src/custom_connection_limits.rs +++ /dev/null @@ -1,335 +0,0 @@ -// This file is part of Gear. -// -// Copyright (C) 2024-2025 Gear Technologies Inc. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use crate::utils::{ConnectionLimit, ConnectionLimitError, ConnectionMap}; -use libp2p::{ - Multiaddr, PeerId, - core::{ConnectedPoint, Endpoint, transport::PortUse}, - swarm::{ - ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, dummy, - }, -}; -use std::{ - convert::Infallible, - task::{Context, Poll}, -}; - -#[derive(Debug, Copy, Clone, Eq, PartialEq, derive_more::Display)] -pub enum LimitExceededKind { - #[display("established incoming per peer")] - EstablishedIncomingPerPeer, - #[display("established outbound per peer")] - EstablishedOutboundPerPeer, -} - -#[derive(Debug, Eq, PartialEq, derive_more::Display)] -#[display("custom connection limit exceeded: at most {limit} {kind} are allowed")] -pub struct LimitExceeded { - pub limit: u32, - pub kind: LimitExceededKind, -} - -impl std::error::Error for LimitExceeded {} - -#[derive(Default)] -pub struct Limits { - pub max_established_incoming_per_peer: Option, - pub max_established_outbound_per_peer: Option, -} - -impl Limits { - pub fn with_max_established_incoming_per_peer(mut self, limit: Option) -> Self { - self.max_established_incoming_per_peer = limit; - self - } - - pub fn with_max_established_outbound_per_peer(mut self, limit: Option) -> Self { - self.max_established_outbound_per_peer = limit; - self - } -} - -pub struct Behaviour { - established_incoming_per_peer: ConnectionMap, - established_outbound_per_peer: ConnectionMap, -} - -impl Behaviour { - pub fn new(limits: Limits) -> Self { - Self { - established_incoming_per_peer: ConnectionMap::with_connection_limit( - limits.max_established_incoming_per_peer.unwrap_or(u32::MAX), - ), - established_outbound_per_peer: ConnectionMap::with_connection_limit( - limits.max_established_outbound_per_peer.unwrap_or(u32::MAX), - ), - } - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = dummy::ConnectionHandler; - type ToSwarm = Infallible; - - fn handle_established_inbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - _local_addr: &Multiaddr, - _remote_addr: &Multiaddr, - ) -> Result, ConnectionDenied> { - self.established_incoming_per_peer - .add_connection(peer, connection_id) - .map_err(|ConnectionLimitError { limit }| { - ConnectionDenied::new(LimitExceeded { - limit, - kind: LimitExceededKind::EstablishedIncomingPerPeer, - }) - })?; - - Ok(dummy::ConnectionHandler) - } - - fn handle_established_outbound_connection( - &mut self, - connection_id: ConnectionId, - peer: PeerId, - _addr: &Multiaddr, - _role_override: Endpoint, - _port_use: PortUse, - ) -> Result, ConnectionDenied> { - self.established_outbound_per_peer - .add_connection(peer, connection_id) - .map_err(|ConnectionLimitError { limit }| { - ConnectionDenied::new(LimitExceeded { - limit, - kind: LimitExceededKind::EstablishedOutboundPerPeer, - }) - })?; - - Ok(dummy::ConnectionHandler) - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - if let FromSwarm::ConnectionClosed(ConnectionClosed { - peer_id, - connection_id, - endpoint, - .. - }) = event - { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outbound_per_peer - .remove_connection(peer_id, connection_id); - } - ConnectedPoint::Listener { .. } => { - self.established_incoming_per_peer - .remove_connection(peer_id, connection_id); - } - } - } - } - - fn on_connection_handler_event( - &mut self, - _peer_id: PeerId, - _connection_id: ConnectionId, - _event: THandlerOutEvent, - ) { - } - - fn poll( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll>> { - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::utils::tests::init_logger; - use libp2p::{ - Swarm, - futures::{StreamExt, stream}, - swarm::{ - DialError, ListenError, SwarmEvent, - dial_opts::{DialOpts, PeerCondition}, - }, - }; - use libp2p_swarm_test::SwarmExt; - - fn new_swarm(limits: Limits) -> Swarm { - SwarmExt::new_ephemeral_tokio(|_keypair| Behaviour::new(limits)) - } - - fn take_n_events( - swarm: &mut Swarm, - ) -> impl stream::Stream> + '_ { - stream::unfold(swarm, |swarm| async move { - let event = swarm.next_swarm_event().await; - Some((event, swarm)) - }) - .take(NUM_EVENTS) - } - - #[tokio::test] - async fn inbound_connection_denied() { - const PEERS: usize = 10; - const INBOUND_CONNECTIONS: usize = 10; - const INBOUND_LIMIT: u32 = 5; - - init_logger(); - - let mut limited_peer = new_swarm( - Limits::default().with_max_established_incoming_per_peer(Some(INBOUND_LIMIT)), - ); - let limited_peer_id = *limited_peer.local_peer_id(); - let (limited_peer_addr, _) = limited_peer.listen().with_memory_addr_external().await; - - let mut unlimited_peers = vec![]; - for _ in 0..PEERS { - let mut peer = new_swarm(Limits::default()); - let peer_id = *peer.local_peer_id(); - - for _ in 0..INBOUND_CONNECTIONS { - peer.dial( - DialOpts::peer_id(limited_peer_id) - .condition(PeerCondition::Always) - .addresses(vec![limited_peer_addr.clone()]) - .build(), - ) - .unwrap(); - } - - unlimited_peers.push(peer_id); - - tokio::spawn(peer.loop_on_next()); - } - - for _ in 0..PEERS { - take_n_events::(&mut limited_peer) - .for_each(|event| async move { - assert!(matches!(event, SwarmEvent::IncomingConnection { .. })); - }) - .await; - } - - for unlimited_peer_id in unlimited_peers { - // first `INBOUND_LIMIT` connections are established - take_n_events::<{ INBOUND_LIMIT as usize }>(&mut limited_peer) - .for_each(|event| async move { - assert!(matches!(event, SwarmEvent::ConnectionEstablished { peer_id,.. } if peer_id == unlimited_peer_id)); - }) - .await; - - // the rest of connections are denied - take_n_events::<{ INBOUND_CONNECTIONS - INBOUND_LIMIT as usize }>(&mut limited_peer) - .for_each(|event| async move { - if let SwarmEvent::IncomingConnectionError { - error: ListenError::Denied { cause }, - .. - } = event - { - let exceeded = cause.downcast::().unwrap(); - assert_eq!( - exceeded, - LimitExceeded { - limit: 5, - kind: LimitExceededKind::EstablishedIncomingPerPeer, - } - ); - } else { - unreachable!("{event:?}"); - } - }) - .await; - } - } - - #[tokio::test] - async fn outbound_connection_denied() { - const PEERS: usize = 10; - const OUTBOUND_CONNECTIONS: usize = 10; - const OUTBOUND_LIMIT: u32 = 5; - - init_logger(); - - let mut limited_peer = new_swarm( - Limits::default().with_max_established_outbound_per_peer(Some(OUTBOUND_LIMIT)), - ); - - let mut unlimited_peers = vec![]; - for _ in 0..PEERS { - let mut peer = new_swarm(Limits::default()); - let (peer_addr, _) = peer.listen().with_memory_addr_external().await; - - let peer_id = *peer.local_peer_id(); - for _ in 0..OUTBOUND_CONNECTIONS { - limited_peer - .dial( - DialOpts::peer_id(peer_id) - .condition(PeerCondition::Always) - .addresses(vec![peer_addr.clone()]) - .build(), - ) - .unwrap(); - } - - tokio::spawn(peer.loop_on_next()); - - unlimited_peers.push(peer_id); - } - - for unlimited_peer_id in unlimited_peers { - // first `OUTBOUND_LIMIT` connections are established - take_n_events::<{ OUTBOUND_LIMIT as usize }>(&mut limited_peer) - .for_each(|event| async move { - assert!(matches!(event, SwarmEvent::ConnectionEstablished { peer_id,.. } if peer_id == unlimited_peer_id)); - }) - .await; - - // the rest of connections are denied - take_n_events::<{ OUTBOUND_CONNECTIONS - OUTBOUND_LIMIT as usize }>(&mut limited_peer) - .for_each(|event| async move { - if let SwarmEvent::OutgoingConnectionError { - error: DialError::Denied { cause }, - peer_id: Some(peer_id), - .. - } = event - { - let exceeded = cause.downcast::().unwrap(); - assert_eq!( - exceeded, - LimitExceeded { - limit: 5, - kind: LimitExceededKind::EstablishedOutboundPerPeer, - } - ); - assert_eq!(peer_id, unlimited_peer_id); - } else { - unreachable!("{event:?}"); - } - }) - .await; - } - } -} diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index d24c2bee34e..5c9bd0f35ff 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -16,7 +16,6 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -mod custom_connection_limits; pub mod db_sync; mod gossipsub; mod injected; @@ -75,8 +74,8 @@ pub const DEFAULT_LISTEN_PORT: u16 = 20333; pub const PROTOCOL_VERSION: &str = "ethexe/0.1.0"; pub const AGENT_VERSION: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")); -const MAX_ESTABLISHED_INCOMING_PER_PEER_CONNECTIONS: u32 = 1; -const MAX_ESTABLISHED_OUTBOUND_PER_PEER_CONNECTIONS: u32 = 1; +// limit could be 1, but we want to prevent connection churn when both peers dial each other +const MAX_ESTABLISHED_PER_PEER_CONNECTIONS: u32 = 2; const MAX_ESTABLISHED_INCOMING_CONNECTIONS: u32 = 500; const MAX_ESTABLISHED_OUTGOING_CONNECTIONS: u32 = 500; const MAX_PENDING_INCOMING_CONNECTIONS: u32 = 10; @@ -385,7 +384,6 @@ impl NetworkService { fn handle_behaviour_event(&mut self, event: BehaviourEvent) -> Option { match event { - BehaviourEvent::CustomConnectionLimits(event) => match event {}, BehaviourEvent::ConnectionLimits(event) => match event {}, BehaviourEvent::Slots(event) => match event {}, BehaviourEvent::PeerScore(event) => return self.handle_peer_score_event(event), @@ -643,8 +641,6 @@ struct BehaviourConfig { #[derive(NetworkBehaviour)] pub(crate) struct Behaviour { - // custom options to limit connections - pub custom_connection_limits: custom_connection_limits::Behaviour, // hard caps pub connection_limits: connection_limits::Behaviour, // peer amount manager @@ -687,22 +683,8 @@ impl Behaviour { let peer_id = keypair.public().to_peer_id(); - // we use custom behaviour because - // `libp2p::connection_limits::Behaviour` limits inbound & outbound - // connections per peer in total, so protocols may fail to establish - // at least 1 inbound & 1 outbound connection in specific circumstances - // (for example, active VPN connection + communication with mDNS discovered peers) - let custom_connection_limits = custom_connection_limits::Limits::default() - .with_max_established_incoming_per_peer(Some( - MAX_ESTABLISHED_INCOMING_PER_PEER_CONNECTIONS, - )) - .with_max_established_outbound_per_peer(Some( - MAX_ESTABLISHED_OUTBOUND_PER_PEER_CONNECTIONS, - )); - let custom_connection_limits = - custom_connection_limits::Behaviour::new(custom_connection_limits); - let connection_limits = connection_limits::ConnectionLimits::default() + .with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER_CONNECTIONS)) .with_max_established_incoming(Some(MAX_ESTABLISHED_INCOMING_CONNECTIONS)) .with_max_established_outgoing(Some(MAX_ESTABLISHED_OUTGOING_CONNECTIONS)) .with_max_pending_incoming(Some(MAX_PENDING_INCOMING_CONNECTIONS)) @@ -768,7 +750,6 @@ impl Behaviour { let validator_discovery = validator::discovery::Behaviour::new(validator_discovery); Ok(Self { - custom_connection_limits, connection_limits, slots, peer_score, diff --git a/ethexe/network/src/utils.rs b/ethexe/network/src/utils.rs index 35a3c429e45..ab9571a6476 100644 --- a/ethexe/network/src/utils.rs +++ b/ethexe/network/src/utils.rs @@ -157,35 +157,6 @@ pub(crate) trait ConnectionMapLimit { ) -> Result<(), Self::Error>; } -#[derive(Debug)] -pub(crate) struct ConnectionLimitError { - pub limit: u32, -} - -pub(crate) struct ConnectionLimit { - limit: u32, -} - -impl ConnectionMapLimit for ConnectionLimit { - type Error = ConnectionLimitError; - - fn check_limit( - &self, - connections: &HashMap>, - peer_id: PeerId, - ) -> Result<(), Self::Error> { - let current = connections - .get(&peer_id) - .map(|connections| connections.len()) - .unwrap_or(0) as u32; - if current < self.limit { - Ok(()) - } else { - Err(ConnectionLimitError { limit: self.limit }) - } - } -} - pub(crate) struct NoLimits; impl ConnectionMapLimit for NoLimits { @@ -237,15 +208,6 @@ impl ConnectionMap { } } -impl ConnectionMap { - pub(crate) fn with_connection_limit(limit: u32) -> Self { - Self { - inner: Default::default(), - limit: ConnectionLimit { limit }, - } - } -} - impl ConnectionMap { pub(crate) fn without_limits() -> Self { Self { @@ -461,33 +423,6 @@ pub(crate) mod tests { .try_init(); } - #[test] - fn connection_map_limit_works() { - const LIMIT: u32 = 5; - - let mut map = ConnectionMap::with_connection_limit(LIMIT); - - let main_peer = PeerId::random(); - - for i in 0..LIMIT { - map.add_connection(main_peer, ConnectionId::new_unchecked(i as usize)) - .unwrap(); - } - - let limit = map - .add_connection(main_peer, ConnectionId::new_unchecked(usize::MAX)) - .unwrap_err() - .limit; - assert_eq!(limit, LIMIT); - - // new peer so no limit exceeded yet - map.add_connection( - PeerId::random(), - ConnectionId::new_unchecked(usize::MAX / 2), - ) - .unwrap(); - } - #[test] fn connection_map_key_cleared() { let mut map = ConnectionMap::without_limits(); From 8268a6794f0d5c2d6ee902878f6661fb92525710 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Thu, 2 Apr 2026 17:09:56 +0200 Subject: [PATCH 05/12] append tests --- ethexe/consensus/src/announces.rs | 137 ++++++++++++++++++++++++++++++ ethexe/service/src/tests/mod.rs | 108 +++++++++++++++++++++++ 2 files changed, 245 insertions(+) diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 1cd040d9760..7afd61bb783 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -880,6 +880,7 @@ mod tests { StateHashWithQueueSize, db::*, events::{BlockEvent, MirrorEvent, mirror::MessageQueueingRequestedEvent}, + gear::StateTransition, injected::InjectedTransaction, mock::*, }; @@ -1228,4 +1229,140 @@ mod tests { AnnounceRejectionReason::TooManyTouchedPrograms(MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE + 1) ); } + + #[test] + fn best_announce_prefers_base_sibling_with_same_outcome() { + let db = Database::memory(); + + let mut chain = BlockChain::mock(5); + + // Block 3 already has a base announce. Add a not-base sibling with the same parent. + let base_hash = chain.block_top_announce_hash(3); + let base_announce = &chain.block_top_announce(3).announce; + let parent = base_announce.parent; + let block_hash = base_announce.block_hash; + + let not_base_announce = Announce::with_default_gas(block_hash, parent); + let not_base_hash = not_base_announce.to_hash(); + + chain.blocks[3] + .as_prepared_mut() + .announces + .as_mut() + .unwrap() + .insert(not_base_hash); + + // Both announces computed with the same (empty) outcome + chain.announces.insert( + not_base_hash, + AnnounceData { + announce: not_base_announce, + computed: Some(MockComputedAnnounceData::default()), + }, + ); + + let chain = chain.setup(&db); + + // Not-base has more points (1 vs 0), but base sibling has the same outcome, + // so best_announce should prefer the base one. + let result = best_announce(&db, [not_base_hash, base_hash], 3).unwrap(); + assert_eq!( + result, base_hash, + "Should prefer base announce when sibling outcomes are the same" + ); + + // Also verify via best_parent_announce: block 4 should pick base at block 3 as best parent + let (best_parent_hash, _child) = + best_parent_announce(&db, chain.blocks[4].hash, 3).unwrap(); + assert_eq!( + best_parent_hash, base_hash, + "best_parent_announce should prefer base parent with same outcome" + ); + } + + #[test] + fn best_announce_keeps_not_base_when_outcomes_differ() { + let db = Database::memory(); + + let mut chain = BlockChain::mock(5); + + let base_hash = chain.block_top_announce_hash(3); + let base_announce = &chain.block_top_announce(3).announce; + let parent = base_announce.parent; + let block_hash = base_announce.block_hash; + + let not_base_announce = Announce::with_default_gas(block_hash, parent); + let not_base_hash = not_base_announce.to_hash(); + + chain.blocks[3] + .as_prepared_mut() + .announces + .as_mut() + .unwrap() + .insert(not_base_hash); + + // Not-base announce has a different outcome (non-empty) + chain.announces.insert( + not_base_hash, + AnnounceData { + announce: not_base_announce, + computed: Some(MockComputedAnnounceData { + outcome: vec![StateTransition { + actor_id: ActorId::from(1u64), + ..Default::default() + }], + ..Default::default() + }), + }, + ); + + let _chain = chain.setup(&db); + + // Not-base has more points AND different outcome, so it wins. + let result = best_announce(&db, [not_base_hash, base_hash], 3).unwrap(); + assert_eq!( + result, not_base_hash, + "Should keep not-base announce when outcomes differ" + ); + } + + #[test] + fn best_announce_not_computed_keeps_not_base() { + let db = Database::memory(); + + let mut chain = BlockChain::mock(5); + + let base_hash = chain.block_top_announce_hash(3); + let base_announce = &chain.block_top_announce(3).announce; + let parent = base_announce.parent; + let block_hash = base_announce.block_hash; + + let not_base_announce = Announce::with_default_gas(block_hash, parent); + let not_base_hash = not_base_announce.to_hash(); + + chain.blocks[3] + .as_prepared_mut() + .announces + .as_mut() + .unwrap() + .insert(not_base_hash); + + // Not-base announce is NOT computed (computed: None) + chain.announces.insert( + not_base_hash, + AnnounceData { + announce: not_base_announce, + computed: None, + }, + ); + + let _chain = chain.setup(&db); + + // Not-base has more points; sibling check returns NotComputed, so not-base wins. + let result = best_announce(&db, [not_base_hash, base_hash], 3).unwrap(); + assert_eq!( + result, not_base_hash, + "Should keep not-base announce when sibling is not computed" + ); + } } diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index df2144a0320..5c4a386c976 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -3457,3 +3457,111 @@ async fn reply_callback() { assert!(demo_caller.onErrorReplyCalled().call().await.unwrap()); } + +/// After several idle blocks (no user messages), when a ping message arrives, +/// base-announce-priority ensures the announce chain consists of base announces, +/// so the batch commitment expiry equals commitment_delay_limit (not 1). +#[tokio::test] +#[ntest::timeout(120_000)] +async fn batch_commitment_expiry_after_idle_blocks() { + init_logger(); + + let captured_expiry: Arc> = Arc::new(Mutex::new(0)); + + let mut env = TestEnv::new(Default::default()).await.unwrap(); + + // Custom committer that captures batch expiry values + let captured_expiry_clone = captured_expiry.clone(); + let router = env.ethereum.router().clone(); + + #[derive(Clone)] + struct ExpiryCapturingCommitter { + router: Router, + captured: Arc>, + } + + #[async_trait::async_trait] + impl BatchCommitter for ExpiryCapturingCommitter { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + async fn commit( + self: Box, + batch: BatchCommitment, + signatures: Vec, + ) -> anyhow::Result { + log::info!("📗 Captured batch commitment with expiry={}", batch.expiry); + *self.captured.lock().await = batch.expiry; + let pending = self.router.commit_batch_pending(batch, signatures).await; + pending? + .try_get_receipt_check_reverted() + .await + .map(|r| r.transaction_hash.0.into()) + } + } + + let committer = ExpiryCapturingCommitter { + router, + captured: captured_expiry_clone, + }; + + let mut node = env + .new_node(NodeConfig::default().validator(env.validators[0])) + .await; + node.custom_committer = Some(Box::new(committer)); + node.start_service().await; + + // Setup: upload code and create program + let res = env + .upload_code(demo_ping::WASM_BINARY) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert!(res.valid); + + let res = env + .create_program(res.code_id, 500_000_000_000_000) + .await + .unwrap() + .wait_for() + .await + .unwrap(); + let ping_id = res.program_id; + + // Initial ping to ensure program is fully initialized + let res = env + .send_message(ping_id, b"PING") + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.payload, b"PONG"); + + // Skip several idle blocks (no messages, only base announces propagated) + env.skip_blocks(10).await; + + // Send PING after idle period + let res = env + .send_message(ping_id, b"PING") + .await + .unwrap() + .wait_for() + .await + .unwrap(); + assert_eq!(res.payload, b"PONG"); + + // Check that the batch commitment after idle blocks has expiry == commitment_delay_limit. + // With base-announce-priority, the chain of announces during idle blocks is all base, + // so only the head announce (with the ping message) is not-base, + // resulting in expiry = commitment_delay_limit. + let last_expiry = *captured_expiry.lock().await; + assert_eq!( + last_expiry, env.commitment_delay_limit as u8, + "Batch commitment expiry should equal commitment_delay_limit ({}), got {last_expiry}", + env.commitment_delay_limit + ); +} From bf23429ea55423b43ef74b9398345dd504420019 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Fri, 3 Apr 2026 21:30:00 +0200 Subject: [PATCH 06/12] fixes --- ethexe/common/src/hash.rs | 7 + ethexe/common/src/primitives.rs | 2 +- ethexe/consensus/src/announces.rs | 207 ++++++++++-------- ethexe/consensus/src/connect/mod.rs | 2 +- .../consensus/src/validator/batch/manager.rs | 14 +- ethexe/consensus/src/validator/initial.rs | 22 ++ ethexe/consensus/src/validator/mod.rs | 2 +- ethexe/consensus/src/validator/producer.rs | 162 +++++++------- ethexe/consensus/src/validator/subordinate.rs | 41 ++-- ethexe/rpc/src/apis/mod.rs | 4 +- 10 files changed, 259 insertions(+), 204 deletions(-) diff --git a/ethexe/common/src/hash.rs b/ethexe/common/src/hash.rs index f1ff1fec934..29936c9b8cc 100644 --- a/ethexe/common/src/hash.rs +++ b/ethexe/common/src/hash.rs @@ -215,3 +215,10 @@ impl From> for MaybeHashOf { Self(Some(value)) } } + +/// Hash of value with the value itself. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WithHashOf { + pub hash: HashOf, + pub value: T, +} diff --git a/ethexe/common/src/primitives.rs b/ethexe/common/src/primitives.rs index 2c281138fc1..1bf776427d7 100644 --- a/ethexe/common/src/primitives.rs +++ b/ethexe/common/src/primitives.rs @@ -82,7 +82,7 @@ pub struct SimpleBlockData { #[cfg_attr(feature = "serde", derive(Hash))] #[derive(Clone, Debug, Encode, Decode, TypeInfo, PartialEq, Eq, derive_more::Display)] #[display( - "Announce(block: {block_hash}, parent: {parent}, gas: {gas_allowance:?}, txs: {injected_transactions:?})" + "Announce(hash: {}, block: {block_hash}, parent: {parent}, gas: {gas_allowance:?}, txs: {injected_transactions:?})", self.to_hash() )] pub struct Announce { pub block_hash: H256, diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 7afd61bb783..39930b6848a 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -91,7 +91,7 @@ use crate::tx_validation::{TxValidity, TxValidityChecker}; use anyhow::{Result, anyhow, ensure}; use ethexe_common::{ - Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, SimpleBlockData, + Announce, HashOf, MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE, SimpleBlockData, WithHashOf, db::{ AnnounceStorageRW, BlockMetaStorageRW, GlobalsStorageRO, InjectedStorageRW, OnChainStorageRO, @@ -128,6 +128,13 @@ pub trait DBAnnouncesExt: &self, announces: impl IntoIterator>, ) -> Result>>; + + /// Find announce in the same block with the same parent as provided announce. + fn find_block_base_announce_with_parent( + &self, + block_hash: H256, + parent: HashOf, + ) -> Result>>; } impl< @@ -208,6 +215,32 @@ impl< }) .collect() } + + fn find_block_base_announce_with_parent( + &self, + block_hash: H256, + parent: HashOf, + ) -> Result>> { + let announces = self + .block_meta(block_hash) + .announces + .ok_or_else(|| anyhow!("announces not found for block({block_hash})"))?; + + for announce_hash in announces { + let announce = self + .announce(announce_hash) + .ok_or_else(|| anyhow!("announce({announce_hash}) not found"))?; + + if announce.parent == parent && announce.is_base() { + return Ok(Some(WithHashOf { + hash: announce_hash, + value: announce, + })); + } + } + + Ok(None) + } } /// Propagate announces along the provided chain of blocks. @@ -600,7 +633,7 @@ pub fn best_parent_announce( db: &impl DBAnnouncesExt, block_hash: H256, commitment_delay_limit: u32, -) -> Result<(HashOf, Announce)> { +) -> Result> { let announces = db .block_meta(block_hash) .announces @@ -610,30 +643,72 @@ pub fn best_parent_announce( // because some of them may be expired at `block_hash`, // so we take parents of all announces from `block_hash`, // to be sure that we take only not expired parent announces. - let parent_announces = db.announces_parents(announces.clone().into_iter())?; + let candidates = db.announces_parents(announces)?; - let best_announce_hash = best_announce(db, parent_announces, commitment_delay_limit)?; + best_announce(db, candidates, commitment_delay_limit - 1) +} - for announce_hash in announces { - let announce = db - .announce(announce_hash) - .ok_or_else(|| anyhow!("announce({announce_hash}) not found in db"))?; +pub fn block_best_announce( + db: &impl DBAnnouncesExt, + block_hash: H256, + commitment_delay_limit: u32, +) -> Result> { + let candidates = db + .block_meta(block_hash) + .announces + .ok_or_else(|| anyhow!("announces not found for block {block_hash}"))?; + + // We do not take announces directly from parent block, + // because some of them may be expired at `block_hash`, + // so we take parents of all announces from `block_hash`, + // to be sure that we take only not expired parent announces. + let parent_announces = db.announces_parents(candidates.iter().cloned())?; + + let best_parent = best_announce(db, parent_announces, commitment_delay_limit - 1)?; - if announce.parent == best_announce_hash { - return Ok((best_announce_hash, announce)); + // Find child announces + let mut not_base_announce_hash = None; + let mut base_announce_hash = None; + for candidate in candidates { + let announce = db + .announce(candidate) + .ok_or_else(|| anyhow!("announce({candidate}) not found"))?; + + if announce.parent == best_parent && !announce.is_base() { + if not_base_announce_hash.is_some() { + tracing::warn!("Found multiple not-base announces: maybe double announcement"); + } else { + not_base_announce_hash = Some(candidate); + } + } else if announce.parent == best_parent && announce.is_base() { + if base_announce_hash.replace(candidate).is_some() { + unreachable!("Two different siblings base announces is impossible"); + } } } - unreachable!( - "Best announce {best_announce_hash} must be parent of at least one announce in block {block_hash}" - ); + match (not_base_announce_hash, base_announce_hash) { + (Some(not_base), Some(base)) => { + if announces_have_equal_outcomes(db, base, not_base) { + // if base announce has the same outcome as not-base announce, then better to use base + Ok(base) + } else { + Ok(not_base) + } + } + (Some(not_base), None) => Ok(not_base), + (None, Some(base)) => Ok(base), + (None, None) => Err(anyhow!( + "No announces with parent {best_parent} found for block {block_hash}" + )), + } } /// Returns announce hash, which is supposed to be best among provided announces. -pub fn best_announce( +fn best_announce( db: &impl DBAnnouncesExt, announces: impl IntoIterator>, - commitment_delay_limit: u32, + limit: u32, ) -> Result> { let mut announces = announces.into_iter(); let Some(first) = announces.next() else { @@ -644,7 +719,7 @@ pub fn best_announce( let announce_points = |mut announce_hash| -> Result { let mut points = 0; - for _ in 0..commitment_delay_limit { + for _ in 0..limit { let announce = db .announce(announce_hash) .ok_or_else(|| anyhow!("Announce {announce_hash} not found in db"))?; @@ -674,90 +749,37 @@ pub fn best_announce( } } - if let AnnounceSiblingsOutcomeStatus::OutcomeIsSame { - sibling_announce_hash, - sibling_announce, - } = check_announce_sibling_outcome(db, best_announce_hash)? - && sibling_announce.is_base() - { - // if sibling has same outcome and it's base, then better to use base - Ok(sibling_announce_hash) - } else { - Ok(best_announce_hash) - } -} + let best_announce = db + .announce(best_announce_hash) + .ok_or_else(|| anyhow!("Best announce {best_announce_hash} not found in db"))?; -pub enum AnnounceSiblingsOutcomeStatus { - NotFound, - NotComputed, - OutcomeIsDifferent, - OutcomeIsSame { - sibling_announce_hash: HashOf, - sibling_announce: Announce, - }, -} + if best_announce.is_base() { + // we can return it without checking siblings + return Ok(best_announce_hash); + } -/// Siblings of announce is announces with the same parent -pub fn check_announce_sibling_outcome( - db: &impl DBAnnouncesExt, - announce_hash: HashOf, -) -> Result { - let Some((sibling_announce_hash, sibling_announce)) = announce_sibling(db, announce_hash)? + let Some(base_announce) = + db.find_block_base_announce_with_parent(best_announce.block_hash, best_announce.parent)? else { - return Ok(AnnounceSiblingsOutcomeStatus::NotFound); + return Ok(best_announce_hash); }; - if !db.announce_meta(announce_hash).computed - || !db.announce_meta(sibling_announce_hash).computed - { - return Ok(AnnounceSiblingsOutcomeStatus::NotComputed); - } - - let announce_outcome = db - .announce_outcome(announce_hash) - .ok_or_else(|| anyhow!("outcome not found for computed announce {announce_hash:?}"))?; - let sibling_outcome = db.announce_outcome(sibling_announce_hash).ok_or_else(|| { - anyhow!("outcome not found for computed sibling announce {sibling_announce_hash:?}") - })?; - - if announce_outcome == sibling_outcome { - Ok(AnnounceSiblingsOutcomeStatus::OutcomeIsSame { - sibling_announce_hash, - sibling_announce, - }) + if announces_have_equal_outcomes(db, base_announce.hash, best_announce_hash) { + // if base announce has the same outcome as best announce, then better to use base + Ok(base_announce.hash) } else { - Ok(AnnounceSiblingsOutcomeStatus::OutcomeIsDifferent) + Ok(best_announce_hash) } } -pub fn announce_sibling( +pub fn announces_have_equal_outcomes( db: &impl DBAnnouncesExt, - announce_hash: HashOf, -) -> Result, Announce)>> { - let announce = db - .announce(announce_hash) - .ok_or_else(|| anyhow!("announce({announce_hash}) not found"))?; - - let neighbors = db - .block_meta(announce.block_hash) - .announces - .ok_or_else(|| anyhow!("announces not found for block({})", announce.block_hash))?; - - for neighbor_hash in neighbors { - if neighbor_hash == announce_hash { - continue; - } - - let neighbor_announce = db - .announce(neighbor_hash) - .ok_or_else(|| anyhow!("announce({neighbor_hash}) not found"))?; - - if neighbor_announce.parent == announce.parent { - return Ok(Some((neighbor_hash, neighbor_announce))); - } - } - - Ok(None) + announce1_hash: HashOf, + announce2_hash: HashOf, +) -> bool { + db.announce_outcome(announce1_hash) + .map(|base_outcome| Some(base_outcome) == db.announce_outcome(announce2_hash)) + .unwrap_or(false) } #[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] @@ -779,7 +801,7 @@ pub enum AnnounceRejectionReason { pub enum AnnounceStatus { #[display("Announce {_0} accepted")] Accepted(HashOf), - #[display("Announce {announce:?} rejected: {reason:?}")] + #[display("Announce {announce} rejected: {reason:?}")] Rejected { announce: Announce, reason: AnnounceRejectionReason, @@ -1272,8 +1294,7 @@ mod tests { ); // Also verify via best_parent_announce: block 4 should pick base at block 3 as best parent - let (best_parent_hash, _child) = - best_parent_announce(&db, chain.blocks[4].hash, 3).unwrap(); + let best_parent_hash = best_parent_announce(&db, chain.blocks[4].hash, 3).unwrap(); assert_eq!( best_parent_hash, base_hash, "best_parent_announce should prefer base parent with same outcome" diff --git a/ethexe/consensus/src/connect/mod.rs b/ethexe/consensus/src/connect/mod.rs index fd34882a3c6..3612b5af82f 100644 --- a/ethexe/consensus/src/connect/mod.rs +++ b/ethexe/consensus/src/connect/mod.rs @@ -279,7 +279,7 @@ impl ConsensusService for ConnectService { self.process_announce_from_producer(announce, *producer)?; self.state = State::WaitingForBlock; } else { - tracing::warn!("Receive unexpected {announce:?}, save to pending announces"); + tracing::warn!("Receive unexpected {announce}, save to pending announces"); self.pending_announces .push((sender, announce.block_hash), announce); } diff --git a/ethexe/consensus/src/validator/batch/manager.rs b/ethexe/consensus/src/validator/batch/manager.rs index 5594d52649a..2558376be78 100644 --- a/ethexe/consensus/src/validator/batch/manager.rs +++ b/ethexe/consensus/src/validator/batch/manager.rs @@ -200,15 +200,11 @@ impl BatchCommitmentManager { }); } - let candidates = self - .db - .block_meta(block.hash) - .announces - .into_iter() - .flatten(); - - let best_announce_hash = - announces::best_announce(&self.db, candidates, self.limits.commitment_delay_limit)?; + let best_announce_hash = announces::block_best_announce( + &self.db, + block.hash, + self.limits.commitment_delay_limit, + )?; let Some(last_committed_announce) = self.db.block_meta(block.hash).last_committed_announce diff --git a/ethexe/consensus/src/validator/initial.rs b/ethexe/consensus/src/validator/initial.rs index 8d0d0f5f03b..45185728c62 100644 --- a/ethexe/consensus/src/validator/initial.rs +++ b/ethexe/consensus/src/validator/initial.rs @@ -223,6 +223,28 @@ impl Initial { impl ValidatorContext { fn switch_to_producer_or_subordinate(self, block: SimpleBlockData) -> Result { + // print chain + { + use ethexe_common::{ + HashOf, + db::{AnnounceStorageRO, BlockMetaStorageRO}, + }; + + let db = &self.core.db; + for mut head in db.block_meta(block.hash).announces.into_iter().flatten() { + let mut chain_str = String::new(); + while head != HashOf::zero() { + let announce = db.announce(head).unwrap(); + chain_str = format!( + "{head:6}({}) <- {chain_str}", + if announce.is_base() { "B" } else { "P" } + ); + head = announce.parent; + } + tracing::info!(block = %block.hash, "Announces chain: {chain_str}"); + } + } + let era_index = self.core.timelines.era_from_ts(block.header.timestamp); let validators = self .core diff --git a/ethexe/consensus/src/validator/mod.rs b/ethexe/consensus/src/validator/mod.rs index b4d28ae1267..0186bf88b2a 100644 --- a/ethexe/consensus/src/validator/mod.rs +++ b/ethexe/consensus/src/validator/mod.rs @@ -518,7 +518,7 @@ impl DefaultProcessing { ) -> Result { let mut s = s.into(); s.warning(format!( - "unexpected announce from producer: {announce:?}, saved for later." + "unexpected announce from producer: {announce}, saved for later." )); s.context_mut().pending(announce); Ok(s) diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 55965d64489..96590a5ceb0 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -21,14 +21,17 @@ use super::{ }; use crate::{ ConsensusEvent, - announces::{self, AnnounceSiblingsOutcomeStatus, DBAnnouncesExt}, + announces::{self, DBAnnouncesExt}, validator::DefaultProcessing, }; use anyhow::{Result, anyhow}; use derive_more::{Debug, Display}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, - gear::BatchCommitment, injected::Promise, network::ValidatorMessage, + Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, + db::{AnnounceStorageRO, BlockMetaStorageRO}, + gear::BatchCommitment, + injected::Promise, + network::ValidatorMessage, }; use ethexe_service_utils::Timer; use futures::{FutureExt, future::BoxFuture}; @@ -49,11 +52,16 @@ pub struct Producer { #[derive(Debug, derive_more::IsVariant)] enum State { - Delay { + WaitForBaseComputed { #[debug(skip)] - timer: Option, + timer: Timer, + best_parent_announce_hash: HashOf, + base_announce_hash: HashOf, + }, + WaitForTimer { + #[debug(skip)] + timer: Timer, best_parent_announce_hash: HashOf, - child_announce_hash: Option>, }, WaitingAnnounceComputed(HashOf), AggregateBatchCommitment { @@ -80,36 +88,43 @@ impl StateHandler for Producer { announce_hash: HashOf, ) -> Result { match self.state { - State::Delay { + State::WaitForBaseComputed { timer, - child_announce_hash: Some(child_announce_hash), + base_announce_hash, best_parent_announce_hash, - } if child_announce_hash == announce_hash => { - let timer_is_none = timer.is_none(); - self.state = State::Delay { + } if base_announce_hash == announce_hash => { + self.state = State::WaitForTimer { timer, best_parent_announce_hash, - child_announce_hash: None, }; - if timer_is_none { - self.produce_announce(best_parent_announce_hash) - } else { - Ok(self.into()) - } + Ok(self.into()) } State::WaitingAnnounceComputed(expected) if expected == announce_hash => { - // use base sibling announce as best if it has the same outcome as current announce, otherwise use current announce as best. - let best_announce_hash = if let AnnounceSiblingsOutcomeStatus::OutcomeIsSame { - sibling_announce_hash, - sibling_announce, - } = - announces::check_announce_sibling_outcome(&self.ctx.core.db, announce_hash)? - && sibling_announce.is_base() + let announce = self + .ctx + .core + .db + .announce(announce_hash) + .ok_or_else(|| anyhow!("Own announce {announce_hash} not found in db"))?; + + // Check if there's a base sibling with the same outcome — prefer base if so. + let best_announce_hash = match self + .ctx + .core + .db + .find_block_base_announce_with_parent(self.block.hash, announce.parent)? { - sibling_announce_hash - } else { - announce_hash + Some(base) + if announces::announces_have_equal_outcomes( + &self.ctx.core.db, + announce_hash, + base.hash, + ) => + { + base.hash + } + _ => announce_hash, }; // Aggregate commitment for the block and use `best_announce_hash` as head for chain commitment. @@ -128,8 +143,7 @@ impl StateHandler for Producer { } State::WaitingAnnounceComputed(expected) => { self.warning(format!( - "Computed announce {} is not expected, expected {expected}", - announce_hash + "Computed announce {announce_hash} is not expected, expected {expected}", )); Ok(self.into()) @@ -164,24 +178,13 @@ impl StateHandler for Producer { fn poll_next_state(mut self, cx: &mut Context<'_>) -> Result<(Poll<()>, ValidatorState)> { match &mut self.state { - State::Delay { - timer: Some(timer), - child_announce_hash, + State::WaitForTimer { + timer, best_parent_announce_hash, } => { if timer.poll_unpin(cx).is_ready() { - let state = if child_announce_hash.is_none() { - let announce_hash = *best_parent_announce_hash; - self.produce_announce(announce_hash)? - } else { - self.state = State::Delay { - timer: None, - best_parent_announce_hash: *best_parent_announce_hash, - child_announce_hash: *child_announce_hash, - }; - self.into() - }; - return Ok((Poll::Ready(()), state)); + let parent = *best_parent_announce_hash; + return Ok((Poll::Ready(()), self.produce_announce(parent)?)); } } State::AggregateBatchCommitment { future } => match future.poll_unpin(cx) { @@ -222,27 +225,38 @@ impl Producer { ctx.pending_events.clear(); - let (best_parent_announce_hash, child_announce) = announces::best_parent_announce( + let best_parent_announce_hash = announces::best_parent_announce( &ctx.core.db, block.hash, ctx.core.commitment_delay_limit, )?; - let child_announce_hash = Some(child_announce.to_hash()); - ctx.output(ConsensusEvent::ComputeAnnounce( - child_announce, - PromisePolicy::Disabled, - )); + let state = if let Some(base_announce) = ctx + .core + .db + .find_block_base_announce_with_parent(block.hash, best_parent_announce_hash)? + { + ctx.output(ConsensusEvent::ComputeAnnounce( + base_announce.value, + PromisePolicy::Disabled, + )); + State::WaitForBaseComputed { + timer, + best_parent_announce_hash, + base_announce_hash: base_announce.hash, + } + } else { + State::WaitForTimer { + timer, + best_parent_announce_hash, + } + }; Ok(Self { ctx, block, validators, - state: State::Delay { - timer: Some(timer), - best_parent_announce_hash, - child_announce_hash, - }, + state, } .into()) } @@ -274,7 +288,7 @@ impl Producer { // then the same announce is created multiple times, and include_announce would return already included. // In this case we just go to initial state, without publishing anything and computing announce again. self.warning(format!( - "Announce created {announce:?} is already included at {}", + "Announce created {announce} is already included at {}", self.block.hash )); @@ -350,7 +364,7 @@ mod tests { let (state, announce_hash) = Producer::create(ctx, block, validators) .unwrap() - .skip_timer() + .skip_base() .await .unwrap(); @@ -385,11 +399,9 @@ mod tests { let mut batch = prepare_chain_for_batch_commitment(&ctx.core.db); let block = ctx.core.db.simple_block_data(batch.block_hash); - // If threshold is 1, we should not emit any events and goes thru states coordinator -> submitter -> initial - // until batch is committed let (state, announce_hash) = Producer::create(ctx, block, validators.clone()) .unwrap() - .skip_timer() + .skip_base() .await .unwrap(); @@ -442,7 +454,7 @@ mod tests { let (state, announce_hash) = Producer::create(ctx, block, validators) .unwrap() - .skip_timer() + .skip_base() .await .unwrap(); @@ -488,7 +500,7 @@ mod tests { let (state, announce_hash) = Producer::create(ctx, block, validators) .unwrap() - .skip_timer() + .skip_base() .await .unwrap(); @@ -523,12 +535,12 @@ mod tests { #[async_trait] trait ProducerExt: Sized { - async fn skip_timer(self) -> Result<(Self, HashOf)>; + async fn skip_base(self) -> Result<(Self, HashOf)>; } #[async_trait] impl ProducerExt for ValidatorState { - async fn skip_timer(self) -> Result<(Self, HashOf)> { + async fn skip_base(self) -> Result<(Self, HashOf)> { assert!( self.is_producer(), "Works only for producer state, got {}", @@ -536,26 +548,20 @@ mod tests { ); let producer = self.unwrap_producer(); - assert!( - producer.state.is_delay(), - "Works only for delay state, got {:?}", - producer.state - ); - let state = ValidatorState::from(producer); - // Base announce computation let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); - let ConsensusEvent::ComputeAnnounce(announce, PromisePolicy::Disabled) = event else { - panic!("Expected ComputeAnnounce event announces disabled, got {event:?}"); - }; - - // Set announce as computed - let state = state.process_computed_announce(announce.to_hash())?; + let (state, event) = + if let ConsensusEvent::ComputeAnnounce(announce, PromisePolicy::Disabled) = event { + // Base announce computation requested: set announce as computed + let state = state.process_computed_announce(announce.to_hash())?; + state.wait_for_event().await? + } else { + (state, event) + }; // Announce message publication - let (state, event) = state.wait_for_event().await?; assert!(state.is_producer(), "Expected producer state, got {state}"); assert!(event.is_publish_message()); diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 756811dab19..9ab8002467d 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -22,7 +22,7 @@ use super::{ }; use crate::{ ConsensusEvent, - announces::{self, AnnounceStatus}, + announces::{self, AnnounceStatus, DBAnnouncesExt}, validator::participant::Participant, }; use anyhow::Result; @@ -56,7 +56,7 @@ enum State { WaitingForAnnounce, WaitingAnnounceComputed { announce_hash: Option>, - sibling_hash: Option>, + base_announce_hash: Option>, }, } @@ -79,18 +79,18 @@ impl StateHandler for Subordinate { ) -> Result { if let State::WaitingAnnounceComputed { mut announce_hash, - mut sibling_hash, + mut base_announce_hash, } = self.state { if announce_hash == Some(computed_announce_hash) { announce_hash = None; - } else if sibling_hash == Some(computed_announce_hash) { - sibling_hash = None; + } else if base_announce_hash == Some(computed_announce_hash) { + base_announce_hash = None; } else { return DefaultProcessing::computed_announce(self, computed_announce_hash); } - if announce_hash.is_none() && sibling_hash.is_none() { + if announce_hash.is_none() && base_announce_hash.is_none() { if self.is_validator { return Participant::create(self.ctx, self.block, self.producer); } else { @@ -100,7 +100,7 @@ impl StateHandler for Subordinate { self.state = State::WaitingAnnounceComputed { announce_hash, - sibling_hash, + base_announce_hash, }; Ok(self.into()) @@ -192,20 +192,21 @@ impl Subordinate { fn send_announces_for_computation(mut self, announce: Announce) -> Result { match announces::accept_announce(&self.ctx.core.db, announce.clone())? { AnnounceStatus::Accepted(announce_hash) => { - let sibling = announces::announce_sibling(&self.ctx.core.db, announce_hash)?; - self.ctx .output(ConsensusEvent::AnnounceAccepted(announce_hash)); - let sibling_hash = if let Some((hash, announce)) = sibling { - self.ctx.output(ConsensusEvent::ComputeAnnounce( - announce, - PromisePolicy::Disabled, - )); - Some(hash) - } else { - None - }; + let base_announce_hash = self + .ctx + .core + .db + .find_block_base_announce_with_parent(announce.block_hash, announce.parent)? + .map(|announce| { + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce.value, + PromisePolicy::Disabled, + )); + announce.hash + }); self.ctx.output(ConsensusEvent::ComputeAnnounce( announce, @@ -214,7 +215,7 @@ impl Subordinate { self.state = State::WaitingAnnounceComputed { announce_hash: Some(announce_hash), - sibling_hash, + base_announce_hash, }; Ok(self.into()) @@ -223,7 +224,7 @@ impl Subordinate { self.ctx .output(ConsensusEvent::AnnounceRejected(announce.to_hash())); self.warning(format!( - "Received announce {announce:?} is rejected: {reason:?}" + "Received announce {announce} is rejected: {reason:?}" )); Initial::create(self.ctx) diff --git a/ethexe/rpc/src/apis/mod.rs b/ethexe/rpc/src/apis/mod.rs index 8ed642f4ca1..c3cc2c4f43a 100644 --- a/ethexe/rpc/src/apis/mod.rs +++ b/ethexe/rpc/src/apis/mod.rs @@ -24,7 +24,9 @@ mod program; pub use block::{BlockApi, BlockServer}; pub use code::{CodeApi, CodeServer}; pub use injected::{InjectedApi, InjectedServer}; -pub use program::{FullProgramState, ProgramApi, ProgramServer}; +#[cfg(feature = "client")] +pub use program::FullProgramState; +pub use program::{ProgramApi, ProgramServer}; #[cfg(feature = "client")] pub use crate::apis::{ From 15540716522d87ebc354b01ae4e99a14ee7c3210 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Fri, 3 Apr 2026 21:48:02 +0200 Subject: [PATCH 07/12] fix slots --- ethexe/network/src/slots.rs | 79 ++++--------------------------------- 1 file changed, 7 insertions(+), 72 deletions(-) diff --git a/ethexe/network/src/slots.rs b/ethexe/network/src/slots.rs index bf4e7a9805d..bb3c7beead5 100644 --- a/ethexe/network/src/slots.rs +++ b/ethexe/network/src/slots.rs @@ -41,7 +41,7 @@ use crate::utils::{ConnectionMap, NoLimits, PeerAddresses}; use libp2p::{ Multiaddr, PeerId, - core::{ConnectedPoint, Endpoint, transport::PortUse}, + core::{Endpoint, transport::PortUse}, swarm::{ CloseConnection, ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, @@ -444,12 +444,7 @@ impl Behaviour { self.add_connection(peer, connection_id, PeerDirection::Outbound) } - fn remove_connection( - &mut self, - peer: PeerId, - connection_id: ConnectionId, - endpoint: &ConnectedPoint, - ) -> bool { + fn remove_connection(&mut self, peer: PeerId, connection_id: ConnectionId) -> bool { match self.peers.entry(peer) { Entry::Occupied(mut entry) => { let state = entry.get_mut(); @@ -602,11 +597,11 @@ impl NetworkBehaviour for Behaviour { FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, - endpoint, + endpoint: _, cause: _, remaining_established: _, }) => { - self.remove_connection(peer_id, connection_id, endpoint); + self.remove_connection(peer_id, connection_id); } FromSwarm::DialFailure(DialFailure { peer_id: Some(peer_id), @@ -888,14 +883,7 @@ mod tests { behaviour .add_outbound_connection(peer_id, first_connection_id) .unwrap(); - behaviour.remove_connection( - peer_id, - first_connection_id, - &ConnectedPoint::Listener { - local_addr: random_multiaddr(), - send_back_addr: random_multiaddr(), - }, - ); + behaviour.remove_connection(peer_id, first_connection_id); behaviour .add_outbound_connection(peer_id, ConnectionId::new_unchecked(2)) @@ -990,45 +978,6 @@ mod tests { ); } - #[tokio::test] - async fn add_outbound_connection_allows_peer_after_dialer_side_disconnect() { - let mut behaviour = Behaviour::new(Config::default()); - - let peer_id = PeerId::random(); - let first_connection_id = ConnectionId::new_unchecked(1); - behaviour - .add_outbound_connection(peer_id, first_connection_id) - .unwrap(); - behaviour.remove_connection( - peer_id, - first_connection_id, - &ConnectedPoint::Dialer { - address: random_multiaddr(), - role_override: Endpoint::Dialer, - port_use: PortUse::New, - }, - ); - - assert!(!behaviour.peers.contains_key(&peer_id)); - - behaviour - .add_outbound_connection(peer_id, ConnectionId::new_unchecked(2)) - .unwrap(); - - let (connections, direction) = behaviour - .peers - .get(&peer_id) - .unwrap() - .unwrap_connected_ref(); - assert_eq!(*direction, PeerDirection::Outbound); - assert_eq!( - *connections, - [ConnectionId::new_unchecked(2)] - .into_iter() - .collect::>() - ); - } - #[tokio::test] async fn add_inbound_connection_rejects_peer_in_backoff_period() { let mut behaviour = Behaviour::new(Config::default()); @@ -1038,14 +987,7 @@ mod tests { behaviour .add_inbound_connection(peer_id, first_connection_id) .unwrap(); - behaviour.remove_connection( - peer_id, - first_connection_id, - &ConnectedPoint::Listener { - local_addr: random_multiaddr(), - send_back_addr: random_multiaddr(), - }, - ); + behaviour.remove_connection(peer_id, first_connection_id); let err = behaviour .add_inbound_connection(peer_id, ConnectionId::new_unchecked(2)) @@ -1080,14 +1022,7 @@ mod tests { behaviour .add_outbound_connection(peer_id, first_connection_id) .unwrap(); - behaviour.remove_connection( - peer_id, - first_connection_id, - &ConnectedPoint::Listener { - local_addr: random_multiaddr(), - send_back_addr: random_multiaddr(), - }, - ); + behaviour.remove_connection(peer_id, first_connection_id); let err = behaviour .add_pending_outbound_connection(peer_id, ConnectionId::new_unchecked(2)) From 6cfbdedfd06d385d874f03f6a210b838bf280be9 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Fri, 3 Apr 2026 23:33:42 +0200 Subject: [PATCH 08/12] fix clippy --- ethexe/consensus/src/announces.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 39930b6848a..787abc614f2 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -680,10 +680,11 @@ pub fn block_best_announce( } else { not_base_announce_hash = Some(candidate); } - } else if announce.parent == best_parent && announce.is_base() { - if base_announce_hash.replace(candidate).is_some() { - unreachable!("Two different siblings base announces is impossible"); - } + } else if announce.parent == best_parent + && announce.is_base() + && base_announce_hash.replace(candidate).is_some() + { + unreachable!("Two different siblings base announces is impossible"); } } From 7f2969f2cb72f39c14743b5638a8b5c12bd66fb6 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Mon, 6 Apr 2026 19:44:45 +0200 Subject: [PATCH 09/12] some improvements --- ethexe/consensus/src/announces.rs | 92 ++++++++----------- ethexe/consensus/src/validator/initial.rs | 22 ----- ethexe/consensus/src/validator/producer.rs | 45 ++++----- ethexe/consensus/src/validator/subordinate.rs | 29 +++--- 4 files changed, 78 insertions(+), 110 deletions(-) diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 787abc614f2..796f462dfb2 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -129,11 +129,11 @@ pub trait DBAnnouncesExt: announces: impl IntoIterator>, ) -> Result>>; - /// Find announce in the same block with the same parent as provided announce. - fn find_block_base_announce_with_parent( + /// Find block announce satisfying provided predicate. + fn find_block_announce( &self, block_hash: H256, - parent: HashOf, + pred: impl Fn(&WithHashOf) -> bool, ) -> Result>>; } @@ -216,10 +216,10 @@ impl< .collect() } - fn find_block_base_announce_with_parent( + fn find_block_announce( &self, block_hash: H256, - parent: HashOf, + pred: impl Fn(&WithHashOf) -> bool, ) -> Result>> { let announces = self .block_meta(block_hash) @@ -231,11 +231,13 @@ impl< .announce(announce_hash) .ok_or_else(|| anyhow!("announce({announce_hash}) not found"))?; - if announce.parent == parent && announce.is_base() { - return Ok(Some(WithHashOf { - hash: announce_hash, - value: announce, - })); + let with_hash = WithHashOf { + hash: announce_hash, + value: announce, + }; + + if pred(&with_hash) { + return Ok(Some(with_hash)); } } @@ -645,60 +647,41 @@ pub fn best_parent_announce( // to be sure that we take only not expired parent announces. let candidates = db.announces_parents(announces)?; - best_announce(db, candidates, commitment_delay_limit - 1) + best_announce( + db, + candidates, + commitment_delay_limit + .checked_sub(1) + .expect("commitment_delay_limit must be > 0"), + ) } +/// Returns best announce for `block_hash`. pub fn block_best_announce( db: &impl DBAnnouncesExt, block_hash: H256, commitment_delay_limit: u32, ) -> Result> { - let candidates = db - .block_meta(block_hash) - .announces - .ok_or_else(|| anyhow!("announces not found for block {block_hash}"))?; + let best_parent = best_parent_announce(db, block_hash, commitment_delay_limit)?; - // We do not take announces directly from parent block, - // because some of them may be expired at `block_hash`, - // so we take parents of all announces from `block_hash`, - // to be sure that we take only not expired parent announces. - let parent_announces = db.announces_parents(candidates.iter().cloned())?; - - let best_parent = best_announce(db, parent_announces, commitment_delay_limit - 1)?; - - // Find child announces - let mut not_base_announce_hash = None; - let mut base_announce_hash = None; - for candidate in candidates { - let announce = db - .announce(candidate) - .ok_or_else(|| anyhow!("announce({candidate}) not found"))?; - - if announce.parent == best_parent && !announce.is_base() { - if not_base_announce_hash.is_some() { - tracing::warn!("Found multiple not-base announces: maybe double announcement"); - } else { - not_base_announce_hash = Some(candidate); - } - } else if announce.parent == best_parent - && announce.is_base() - && base_announce_hash.replace(candidate).is_some() - { - unreachable!("Two different siblings base announces is impossible"); - } - } + let not_base_announce_hash = db.find_block_announce(block_hash, |announce| { + announce.value.parent == best_parent && !announce.value.is_base() + })?; + let base_announce_hash = db.find_block_announce(block_hash, |announce| { + announce.value.parent == best_parent && announce.value.is_base() + })?; match (not_base_announce_hash, base_announce_hash) { (Some(not_base), Some(base)) => { - if announces_have_equal_outcomes(db, base, not_base) { + if announces_have_equal_outcomes(db, base.hash, not_base.hash) { // if base announce has the same outcome as not-base announce, then better to use base - Ok(base) + Ok(base.hash) } else { - Ok(not_base) + Ok(not_base.hash) } } - (Some(not_base), None) => Ok(not_base), - (None, Some(base)) => Ok(base), + (Some(not_base), None) => Ok(not_base.hash), + (None, Some(base)) => Ok(base.hash), (None, None) => Err(anyhow!( "No announces with parent {best_parent} found for block {block_hash}" )), @@ -759,8 +742,9 @@ fn best_announce( return Ok(best_announce_hash); } - let Some(base_announce) = - db.find_block_base_announce_with_parent(best_announce.block_hash, best_announce.parent)? + let Some(base_announce) = db.find_block_announce(best_announce.block_hash, |announce| { + announce.value.is_base() && announce.value.parent == best_announce.parent + })? else { return Ok(best_announce_hash); }; @@ -778,9 +762,9 @@ pub fn announces_have_equal_outcomes( announce1_hash: HashOf, announce2_hash: HashOf, ) -> bool { - db.announce_outcome(announce1_hash) - .map(|base_outcome| Some(base_outcome) == db.announce_outcome(announce2_hash)) - .unwrap_or(false) + let outcome1 = db.announce_outcome(announce1_hash); + let outcome2 = db.announce_outcome(announce2_hash); + outcome1.is_some() && outcome1 == outcome2 } #[derive(Debug, Clone, PartialEq, Eq, derive_more::Display)] diff --git a/ethexe/consensus/src/validator/initial.rs b/ethexe/consensus/src/validator/initial.rs index e76f77298a5..692688d3b10 100644 --- a/ethexe/consensus/src/validator/initial.rs +++ b/ethexe/consensus/src/validator/initial.rs @@ -221,28 +221,6 @@ impl Initial { impl ValidatorContext { fn switch_to_producer_or_subordinate(self, block: SimpleBlockData) -> Result { - // print chain - { - use ethexe_common::{ - HashOf, - db::{AnnounceStorageRO, BlockMetaStorageRO}, - }; - - let db = &self.core.db; - for mut head in db.block_meta(block.hash).announces.into_iter().flatten() { - let mut chain_str = String::new(); - while head != HashOf::zero() { - let announce = db.announce(head).unwrap(); - chain_str = format!( - "{head:6}({}) <- {chain_str}", - if announce.is_base() { "B" } else { "P" } - ); - head = announce.parent; - } - tracing::info!(block = %block.hash, "Announces chain: {chain_str}"); - } - } - let era_index = self.core.timelines.era_from_ts(block.header.timestamp); let validators = self .core diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 96590a5ceb0..481257a5ab0 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -109,23 +109,25 @@ impl StateHandler for Producer { .ok_or_else(|| anyhow!("Own announce {announce_hash} not found in db"))?; // Check if there's a base sibling with the same outcome — prefer base if so. - let best_announce_hash = match self - .ctx - .core - .db - .find_block_base_announce_with_parent(self.block.hash, announce.parent)? - { - Some(base) - if announces::announces_have_equal_outcomes( - &self.ctx.core.db, - announce_hash, - base.hash, - ) => - { - base.hash - } - _ => announce_hash, - }; + let best_announce_hash = + match self + .ctx + .core + .db + .find_block_announce(self.block.hash, |candidate| { + candidate.value.is_base() && candidate.value.parent == announce.parent + })? { + Some(base) + if announces::announces_have_equal_outcomes( + &self.ctx.core.db, + announce_hash, + base.hash, + ) => + { + base.hash + } + _ => announce_hash, + }; // Aggregate commitment for the block and use `best_announce_hash` as head for chain commitment. // `best_announce_hash` is computed and included in the db already, so it's safe to use it. @@ -231,11 +233,10 @@ impl Producer { ctx.core.commitment_delay_limit, )?; - let state = if let Some(base_announce) = ctx - .core - .db - .find_block_base_announce_with_parent(block.hash, best_parent_announce_hash)? - { + let state = if let Some(base_announce) = + ctx.core.db.find_block_announce(block.hash, |announce| { + announce.value.is_base() && announce.value.parent == best_parent_announce_hash + })? { ctx.output(ConsensusEvent::ComputeAnnounce( base_announce.value, PromisePolicy::Disabled, diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index 9ab8002467d..f97a67d3d27 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -195,18 +195,23 @@ impl Subordinate { self.ctx .output(ConsensusEvent::AnnounceAccepted(announce_hash)); - let base_announce_hash = self - .ctx - .core - .db - .find_block_base_announce_with_parent(announce.block_hash, announce.parent)? - .map(|announce| { - self.ctx.output(ConsensusEvent::ComputeAnnounce( - announce.value, - PromisePolicy::Disabled, - )); - announce.hash - }); + let base_announce_hash = if !announce.is_base() { + self.ctx + .core + .db + .find_block_announce(announce.block_hash, |candidate| { + candidate.value.is_base() && candidate.value.parent == announce.parent + })? + .map(|announce| { + self.ctx.output(ConsensusEvent::ComputeAnnounce( + announce.value, + PromisePolicy::Disabled, + )); + announce.hash + }) + } else { + None + }; self.ctx.output(ConsensusEvent::ComputeAnnounce( announce, From 1470db3b1110d24ea24e599644bc26376fb557bc Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Mon, 6 Apr 2026 20:24:06 +0200 Subject: [PATCH 10/12] append event BlockComputationComplete --- ethexe/consensus/src/connect/mod.rs | 33 +++++++-- ethexe/consensus/src/lib.rs | 2 + ethexe/consensus/src/validator/producer.rs | 20 +++-- ethexe/consensus/src/validator/subordinate.rs | 6 +- ethexe/service/src/lib.rs | 3 + ethexe/service/src/tests/mod.rs | 73 ++++++++++++------- ethexe/service/src/tests/utils/events.rs | 16 ++-- 7 files changed, 105 insertions(+), 48 deletions(-) diff --git a/ethexe/consensus/src/connect/mod.rs b/ethexe/consensus/src/connect/mod.rs index 8055718b008..3480d1d6586 100644 --- a/ethexe/consensus/src/connect/mod.rs +++ b/ethexe/consensus/src/connect/mod.rs @@ -65,12 +65,15 @@ const MAX_PENDING_ANNOUNCES: NonZeroUsize = NonZeroUsize::new(10).unwrap(); /// └─ receive_announces_response ─► process_after_propagation /// /// process_after_propagation (propagation done ) -/// ├─ announce from producer already received ─► emit ComputeAnnounce ─► WaitingForBlock +/// ├─ announce from producer already received ─► emit ComputeAnnounce ─► WaitingForAnnounceComputed /// └─ no already received announce ─► WaitingForAnnounce /// /// WaitingForAnnounce (waiting for announce from producer) -/// ├─ expected and accepted ─► emit ComputeAnnounce and AcceptAnnounce ─► WaitingForBlock +/// ├─ expected and accepted ─► emit ComputeAnnounce and AcceptAnnounce ─► WaitingForAnnounceComputed /// └─ unexpected ─► cached in pending_announces +/// +/// WaitingForAnnounceComputed (waiting for announce computation to complete) +/// └─ receive_computed_announce ─► emit BlockComputationComplete ─► WaitingForBlock /// ``` #[allow(clippy::enum_variant_names)] #[derive(Debug)] @@ -93,6 +96,10 @@ enum State { chain: VecDeque, waiting_request: AnnouncesRequest, }, + WaitingForAnnounceComputed { + block_hash: H256, + announce_hash: HashOf, + }, } /// Consensus service which tracks the on-chain and ethexe events @@ -137,7 +144,6 @@ impl ConnectService { ) -> Result<()> { if let Some(announce) = self.pending_announces.pop(&(producer, block.hash)) { self.process_announce_from_producer(announce, producer)?; - self.state = State::WaitingForBlock; } else { self.state = State::WaitingForAnnounce { block, producer }; } @@ -160,14 +166,19 @@ impl ConnectService { self.output .push_back(ConsensusEvent::AnnounceRejected(announce.to_hash())); + self.state = State::WaitingForBlock; } AnnounceStatus::Accepted(announce_hash) => { self.output .push_back(ConsensusEvent::AnnounceAccepted(announce_hash)); self.output.push_back(ConsensusEvent::ComputeAnnounce( - announce, + announce.clone(), PromisePolicy::Disabled, )); + self.state = State::WaitingForAnnounceComputed { + block_hash: announce.block_hash, + announce_hash, + }; } } @@ -263,7 +274,18 @@ impl ConsensusService for ConnectService { Ok(()) } - fn receive_computed_announce(&mut self, _announce_hash: HashOf) -> Result<()> { + fn receive_computed_announce(&mut self, announce_hash: HashOf) -> Result<()> { + if let State::WaitingForAnnounceComputed { + block_hash, + announce_hash: expected, + } = self.state + { + if expected == announce_hash { + self.output + .push_back(ConsensusEvent::BlockComputationComplete(block_hash)); + self.state = State::WaitingForBlock; + } + } Ok(()) } @@ -276,7 +298,6 @@ impl ConsensusService for ConnectService { && announce.block_hash == block.hash { self.process_announce_from_producer(announce, *producer)?; - self.state = State::WaitingForBlock; } else { tracing::warn!("Receive unexpected {announce}, save to pending announces"); self.pending_announces diff --git a/ethexe/consensus/src/lib.rs b/ethexe/consensus/src/lib.rs index c1b8ae43850..c566c848866 100644 --- a/ethexe/consensus/src/lib.rs +++ b/ethexe/consensus/src/lib.rs @@ -127,6 +127,8 @@ pub enum ConsensusEvent { /// Informational event: commitment was successfully submitted #[from] CommitmentSubmitted(CommitmentSubmitted), + /// Informational event: all announce computations for the block are complete + BlockComputationComplete(H256), /// Informational event: during service processing, a warning situation was detected Warning(String), } diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 481257a5ab0..ee2bf2dd834 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -129,6 +129,9 @@ impl StateHandler for Producer { _ => announce_hash, }; + self.ctx + .output(ConsensusEvent::BlockComputationComplete(self.block.hash)); + // Aggregate commitment for the block and use `best_announce_hash` as head for chain commitment. // `best_announce_hash` is computed and included in the db already, so it's safe to use it. self.state = State::AggregateBatchCommitment { @@ -385,7 +388,10 @@ mod tests { // No commitments - no batch and goes to initial state assert!(state.is_initial()); - assert_eq!(state.context().output.len(), 0); + assert_eq!( + state.context().output, + vec![ConsensusEvent::BlockComputationComplete(block.hash)] + ); assert!(eth.committed_batch.read().await.is_none()); } @@ -468,14 +474,14 @@ mod tests { } .setup(&state.context().core.db); - let (state, event) = state - .process_computed_announce(announce_hash) - .unwrap() - .wait_for_event() - .await - .unwrap(); + let state = state.process_computed_announce(announce_hash).unwrap(); + + // First event is BlockComputationComplete + let (state, event) = state.wait_for_event().await.unwrap(); + assert_eq!(event, ConsensusEvent::BlockComputationComplete(block.hash)); // If threshold is 2, producer must goes to coordinator state and emit validation request + let (state, event) = state.wait_for_event().await.unwrap(); assert!(state.is_coordinator()); event .unwrap_publish_message() diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index f97a67d3d27..ae0c7628e72 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -91,6 +91,9 @@ impl StateHandler for Subordinate { } if announce_hash.is_none() && base_announce_hash.is_none() { + self.ctx + .output(ConsensusEvent::BlockComputationComplete(self.block.hash)); + if self.is_validator { return Participant::create(self.ctx, self.block, self.producer); } else { @@ -394,7 +397,8 @@ mod tests { vec![ ConsensusEvent::AnnounceAccepted(announce.data().to_hash()), ConsensusEvent::ComputeAnnounce(base_announce.clone(), PromisePolicy::Disabled), - ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled) + ConsensusEvent::ComputeAnnounce(announce.data().clone(), PromisePolicy::Disabled), + ConsensusEvent::BlockComputationComplete(block.hash), ] ); } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 8c061cc63a6..77428ab46d7 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -697,6 +697,9 @@ impl Service { network_fetcher.push(network.db_sync_handle().request(request.into())); } + ConsensusEvent::BlockComputationComplete(block_hash) => { + log::trace!("Block computation complete: {block_hash}"); + } ConsensusEvent::AnnounceAccepted(_) | ConsensusEvent::AnnounceRejected(_) => { // TODO #4940: consider to publish network message } diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 178ce057832..9aa4eefde3e 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -1141,7 +1141,7 @@ async fn ping_reorg() { let latest_block = env.latest_block().await; connect_node .events() - .find_announce_computed(latest_block.hash) + .find_block_computation_complete(latest_block.hash) .await; log::info!("📗 Abort service to simulate node blocks skipping"); @@ -1209,7 +1209,7 @@ async fn ping_reorg() { let latest_block = env.latest_block().await; connect_node .events() - .find_announce_computed(latest_block.hash) + .find_block_computation_complete(latest_block.hash) .await; // The last step is to test correctness after db cleanup @@ -1420,7 +1420,7 @@ async fn multiple_validators() { for validator in &mut validators { validator .events() - .find_announce_computed(latest_block.hash) + .find_block_computation_complete(latest_block.hash) .await; } @@ -1441,7 +1441,7 @@ async fn multiple_validators() { for validator in validators.iter_mut().skip(1) { validator .events() - .find_announce_computed(latest_block.hash) + .find_block_computation_complete(latest_block.hash) .await; } @@ -1807,7 +1807,10 @@ async fn fast_sync() { } let latest_block = env.latest_block().await.hash; - alice.events().find_block_computed_twice(latest_block).await; + alice + .events() + .find_block_computation_complete(latest_block) + .await; log::info!("Starting Bob (fast-sync)"); let mut bob = env.new_node(NodeConfig::named("Bob").fast_sync()).await; @@ -1831,8 +1834,13 @@ async fn fast_sync() { } let latest_block = env.latest_block().await.hash; - alice.events().find_block_computed_twice(latest_block).await; - bob.events().find_announce_computed(latest_block).await; + alice + .events() + .find_block_computation_complete(latest_block) + .await; + bob.events() + .find_block_computation_complete(latest_block) + .await; log::info!("📗 Stopping Bob"); bob.stop_service().await; @@ -1862,7 +1870,10 @@ async fn fast_sync() { env.skip_blocks(100).await; let latest_block = env.latest_block().await.hash; - alice.events().find_block_computed_twice(latest_block).await; + alice + .events() + .find_block_computation_complete(latest_block) + .await; log::info!("📗 Starting Bob again to check how it handles partially empty database"); bob.start_service().await; @@ -1878,8 +1889,13 @@ async fn fast_sync() { } let latest_block = env.latest_block().await.hash; - alice.events().find_block_computed_twice(latest_block).await; - bob.events().find_announce_computed(latest_block).await; + alice + .events() + .find_block_computation_complete(latest_block) + .await; + bob.events() + .find_block_computation_complete(latest_block) + .await; assert_chain( latest_block, @@ -2099,7 +2115,7 @@ async fn execution_with_canonical_events_quarantine() { log::info!("📗 waiting announce for block {latest_block} computed"); validator .events() - .find_block_computed_twice(latest_block) + .find_block_computation_complete(latest_block) .await; // create a receiver without history so we don't face old `BlockSynced` in further for-loop @@ -2145,7 +2161,7 @@ async fn execution_with_canonical_events_quarantine() { assert!(!check_for_pong(block_hash), "PONG received too early"); - receiver.find_block_computed_twice(block_hash).await; + receiver.find_block_computation_complete(block_hash).await; env.force_new_block().await; } @@ -2944,7 +2960,7 @@ async fn announces_conflicts() { for validator in &mut validators { validator .events() - .find_announce_computed(latest_block.hash) + .find_block_computation_complete(latest_block.hash) .await; } } @@ -3006,20 +3022,13 @@ async fn announces_conflicts() { assert_eq!(res.code, ReplyCode::Success(SuccessReplyReason::Manual)); }); - // Wait till all validators accept announce for the latest block + // Wait till all validators complete computation for the latest block let latest_block = env.latest_block().await.hash; - let mut latest_computed_announce_hash = HashOf::zero(); for receiver in &mut receivers { - let announce_hash = receiver.find_block_computed_twice(latest_block).await; - assert!( - latest_computed_announce_hash == HashOf::zero() - || latest_computed_announce_hash == announce_hash, - "All validators must compute the same announce for the latest block" - ); - latest_computed_announce_hash = announce_hash; + receiver.find_block_computation_complete(latest_block).await; } - latest_computed_announce_hash + validators[0].db.top_announce_hash(latest_block) }; let wait_for_pong = { @@ -3199,7 +3208,7 @@ async fn whole_network_restore() { for validator in &mut validators { validator .events() - .find_announce_computed(latest_block.hash) + .find_block_computation_complete(latest_block.hash) .await; } @@ -3353,8 +3362,13 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { // Wait until both stops processing let latest_block = env.latest_block().await.hash; - bob.events().find_announce_computed(latest_block).await; - alice.events().find_block_computed_twice(latest_block).await; + bob.events() + .find_block_computation_complete(latest_block) + .await; + alice + .events() + .find_block_computation_complete(latest_block) + .await; log::info!("📗 Stopping Bob"); bob.stop_service().await; @@ -3369,7 +3383,10 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { // Wait until Alice stop processing let latest_block = env.latest_block().await.hash; - alice.events().find_block_computed_twice(latest_block).await; + alice + .events() + .find_block_computation_complete(latest_block) + .await; log::info!("📗 Stopping Alice"); alice.stop_service().await; @@ -3491,7 +3508,7 @@ async fn catch_up_test_case(commitment_delay_limit: u32) { // } // let latest_block = env.latest_block().await.hash; - // alice.events().find_block_computed_twice(latest_block).await; + // alice.events().find_block_computation_complete(latest_block).await; // if commitment_delay_limit == 3 { // log::info!("📗 Bob accepts announce from Alice at last"); diff --git a/ethexe/service/src/tests/utils/events.rs b/ethexe/service/src/tests/utils/events.rs index 1befe06a38f..968e9029053 100644 --- a/ethexe/service/src/tests/utils/events.rs +++ b/ethexe/service/src/tests/utils/events.rs @@ -286,12 +286,16 @@ impl TestingEventReceiver { .await } - pub async fn find_block_computed_twice(&mut self, block_hash: H256) -> HashOf { - log::info!("📗 waiting for base and not-base computed for block: {block_hash:?}"); - // First base announce - self.find_announce_computed(block_hash).await; - // Second not-base announce - self.find_announce_computed(block_hash).await + pub async fn find_block_computation_complete(&mut self, block_hash: H256) { + log::info!("📗 waiting for block computation complete: {block_hash:?}"); + self.find(|event| { + matches!( + event, + TestingEvent::Consensus(ConsensusEvent::BlockComputationComplete(h)) + if *h == block_hash + ) + }) + .await; } #[allow(unused)] From 1e78c77ac565a2db740310ecc1f2c97fa3e61323 Mon Sep 17 00:00:00 2001 From: Gregory Sobol Date: Mon, 6 Apr 2026 20:32:36 +0200 Subject: [PATCH 11/12] fix tests timeouts; fix clippy --- ethexe/consensus/src/connect/mod.rs | 9 ++++----- ethexe/service/src/tests/mod.rs | 6 +++--- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/ethexe/consensus/src/connect/mod.rs b/ethexe/consensus/src/connect/mod.rs index 3480d1d6586..16368fff284 100644 --- a/ethexe/consensus/src/connect/mod.rs +++ b/ethexe/consensus/src/connect/mod.rs @@ -279,12 +279,11 @@ impl ConsensusService for ConnectService { block_hash, announce_hash: expected, } = self.state + && expected == announce_hash { - if expected == announce_hash { - self.output - .push_back(ConsensusEvent::BlockComputationComplete(block_hash)); - self.state = State::WaitingForBlock; - } + self.output + .push_back(ConsensusEvent::BlockComputationComplete(block_hash)); + self.state = State::WaitingForBlock; } Ok(()) } diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 9aa4eefde3e..bfdf79341cb 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -73,7 +73,7 @@ use tokio::sync::{ const ETHER: u128 = 1_000_000_000_000_000_000; #[tokio::test] -#[ntest::timeout(60_000)] +#[ntest::timeout(30_000)] async fn invalid_code() { init_logger(); @@ -2884,7 +2884,7 @@ async fn injected_tx_fungible_token_over_network() { } #[tokio::test] -#[ntest::timeout(120_000)] +#[ntest::timeout(60_000)] async fn announces_conflicts() { init_logger(); @@ -3623,7 +3623,7 @@ async fn reply_callback() { /// base-announce-priority ensures the announce chain consists of base announces, /// so the batch commitment expiry equals commitment_delay_limit (not 1). #[tokio::test] -#[ntest::timeout(120_000)] +#[ntest::timeout(60_000)] async fn batch_commitment_expiry_after_idle_blocks() { init_logger(); From d771afbb506c90889eeb5d34b26c31812d352ef1 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:54:16 +0000 Subject: [PATCH 12/12] fix(ethexe-consensus): address review feedback - Rename `WithHashOf.value` field to `data` to avoid confusion with numeric value types - Rename `limit` parameter to `ancestor_depth_limit` in `best_announce` for clarity Co-authored-by: Gregory Sobol --- ethexe/common/src/hash.rs | 4 ++-- ethexe/consensus/src/announces.rs | 12 ++++++------ ethexe/consensus/src/validator/producer.rs | 6 +++--- ethexe/consensus/src/validator/subordinate.rs | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/ethexe/common/src/hash.rs b/ethexe/common/src/hash.rs index 29936c9b8cc..f461ed8029e 100644 --- a/ethexe/common/src/hash.rs +++ b/ethexe/common/src/hash.rs @@ -216,9 +216,9 @@ impl From> for MaybeHashOf { } } -/// Hash of value with the value itself. +/// Hash of data with the data itself. #[derive(Debug, Clone, PartialEq, Eq)] pub struct WithHashOf { pub hash: HashOf, - pub value: T, + pub data: T, } diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 796f462dfb2..d3ea592fbc0 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -233,7 +233,7 @@ impl< let with_hash = WithHashOf { hash: announce_hash, - value: announce, + data: announce, }; if pred(&with_hash) { @@ -665,10 +665,10 @@ pub fn block_best_announce( let best_parent = best_parent_announce(db, block_hash, commitment_delay_limit)?; let not_base_announce_hash = db.find_block_announce(block_hash, |announce| { - announce.value.parent == best_parent && !announce.value.is_base() + announce.data.parent == best_parent && !announce.data.is_base() })?; let base_announce_hash = db.find_block_announce(block_hash, |announce| { - announce.value.parent == best_parent && announce.value.is_base() + announce.data.parent == best_parent && announce.data.is_base() })?; match (not_base_announce_hash, base_announce_hash) { @@ -692,7 +692,7 @@ pub fn block_best_announce( fn best_announce( db: &impl DBAnnouncesExt, announces: impl IntoIterator>, - limit: u32, + ancestor_depth_limit: u32, ) -> Result> { let mut announces = announces.into_iter(); let Some(first) = announces.next() else { @@ -703,7 +703,7 @@ fn best_announce( let announce_points = |mut announce_hash| -> Result { let mut points = 0; - for _ in 0..limit { + for _ in 0..ancestor_depth_limit { let announce = db .announce(announce_hash) .ok_or_else(|| anyhow!("Announce {announce_hash} not found in db"))?; @@ -743,7 +743,7 @@ fn best_announce( } let Some(base_announce) = db.find_block_announce(best_announce.block_hash, |announce| { - announce.value.is_base() && announce.value.parent == best_announce.parent + announce.data.is_base() && announce.data.parent == best_announce.parent })? else { return Ok(best_announce_hash); diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index ee2bf2dd834..c65eca68121 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -115,7 +115,7 @@ impl StateHandler for Producer { .core .db .find_block_announce(self.block.hash, |candidate| { - candidate.value.is_base() && candidate.value.parent == announce.parent + candidate.data.is_base() && candidate.data.parent == announce.parent })? { Some(base) if announces::announces_have_equal_outcomes( @@ -238,10 +238,10 @@ impl Producer { let state = if let Some(base_announce) = ctx.core.db.find_block_announce(block.hash, |announce| { - announce.value.is_base() && announce.value.parent == best_parent_announce_hash + announce.data.is_base() && announce.data.parent == best_parent_announce_hash })? { ctx.output(ConsensusEvent::ComputeAnnounce( - base_announce.value, + base_announce.data, PromisePolicy::Disabled, )); State::WaitForBaseComputed { diff --git a/ethexe/consensus/src/validator/subordinate.rs b/ethexe/consensus/src/validator/subordinate.rs index ae0c7628e72..4e30f18c6d8 100644 --- a/ethexe/consensus/src/validator/subordinate.rs +++ b/ethexe/consensus/src/validator/subordinate.rs @@ -203,11 +203,11 @@ impl Subordinate { .core .db .find_block_announce(announce.block_hash, |candidate| { - candidate.value.is_base() && candidate.value.parent == announce.parent + candidate.data.is_base() && candidate.data.parent == announce.parent })? .map(|announce| { self.ctx.output(ConsensusEvent::ComputeAnnounce( - announce.value, + announce.data, PromisePolicy::Disabled, )); announce.hash