diff --git a/Cargo.lock b/Cargo.lock index afd7b900b34..583f9866fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5208,6 +5208,7 @@ dependencies = [ "sha3", "sp-core", "tap", + "thiserror 2.0.17", ] [[package]] diff --git a/ethexe/cli/src/commands/tx.rs b/ethexe/cli/src/commands/tx.rs index 69b55cd8418..43bbe304cf5 100644 --- a/ethexe/cli/src/commands/tx.rs +++ b/ethexe/cli/src/commands/tx.rs @@ -37,7 +37,9 @@ use clap::{Parser, Subcommand}; use ethexe_common::{ Address, BlockHeader, SimpleBlockData, gear_core::{ids::prelude::CodeIdExt, limited::LimitedVec, rpc::ReplyInfo}, - injected::{AddressedInjectedTransaction, InjectedTransaction, MAX_INJECTED_TX_PAYLOAD_SIZE}, + injected::{ + AddressedInjectedTransaction, InjectedTransaction, MAX_INJECTED_TX_PAYLOAD_SIZE, TxReceipt, + }, }; use ethexe_ethereum::{Ethereum, EthereumBuilder, mirror::ClaimInfo, router::CodeValidationResult}; use ethexe_rpc::{InjectedClient, ProgramClient}; @@ -1126,12 +1128,19 @@ impl TxCommand { || "failed to send injected transaction to Vara.eth RPC", )?; - let promise = subscription + let receipt = subscription .next() .await .ok_or_else(|| anyhow!("no promise received from subscription"))? .with_context(|| "failed to receive transaction promise")? - .into_data(); + .data() + .clone(); + let promise = match receipt { + TxReceipt::Promise(promise) => promise, + TxReceipt::Error(err) => { + return Err(anyhow!("injected transaction failed: {err:?}")); + } + }; let ReplyInfo { payload, value, diff --git a/ethexe/common/Cargo.toml b/ethexe/common/Cargo.toml index 29a6297f007..51fee03a4da 100644 --- a/ethexe/common/Cargo.toml +++ b/ethexe/common/Cargo.toml @@ -28,6 +28,7 @@ gsigner = { workspace = true, default-features = false, features = [ sha3.workspace = true k256 = { version = "0.13.4", features = ["ecdsa"], default-features = false } nonempty.workspace = true +thiserror.workspace = true # mock deps itertools = { workspace = true, optional = true } diff --git a/ethexe/common/src/injected.rs b/ethexe/common/src/injected.rs index 187ee16fd88..f0e6fc8f41f 100644 --- a/ethexe/common/src/injected.rs +++ b/ethexe/common/src/injected.rs @@ -18,6 +18,7 @@ use crate::{Address, HashOf, ToDigest, ecdsa::SignedMessage}; use alloc::string::{String, ToString}; +use anyhow::bail; use core::hash::Hash; use gear_core::{limited::LimitedVec, rpc::ReplyInfo}; use gprimitives::{ActorId, H256, MessageId}; @@ -222,7 +223,124 @@ impl SignedCompactPromise { SignedMessage::try_from_parts(promise, *self.0.signature(), self.0.address()) } } -/// Encoding and decoding of `LimitedVec` as hex string. + +/// Receipt for [InjectedTransaction]. +/// +/// This type is a generic over Promise type is purpose to allow transfer +/// [CompactPromise] between validators and send full [Promise] only to end-user. +#[derive( + Debug, Clone, PartialEq, Eq, Encode, Decode, derive_more::IsVariant, derive_more::Unwrap, +)] +#[cfg_attr(feature = "std", derive(serde::Deserialize, serde::Serialize))] +pub enum TxReceipt

{ + Promise(P), + Error(TransactionError), +} + +impl TxReceipt { + /// Returns the transaction hash the receipt belongs to + pub fn tx_hash(&self) -> HashOf { + match self { + Self::Promise(promise) => promise.tx_hash, + Self::Error(err) => err.tx_hash, + } + } +} + +impl TxReceipt { + /// Returns the transaction hash the receipt belongs to + pub fn tx_hash(&self) -> HashOf { + match self { + Self::Promise(promise) => promise.tx_hash, + Self::Error(err) => err.tx_hash, + } + } +} + +/// Signed wrapper on top of [TxReceipt]. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode, derive_more::From, derive_more::Deref)] +#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "std", serde(transparent))] +pub struct SignedTxReceipt(SignedMessage>); + +// TODO: these implementations are not clear anough, redesign +impl SignedTxReceipt { + pub fn try_map_promise(&self, promise: Promise) -> Result { + let mapped_receipt = match self.0.data() { + TxReceipt::Promise(_) => TxReceipt::Promise(promise), + TxReceipt::Error(_) => return Err("Expected for this variant a promise, not error"), + }; + + let (address, signature) = (self.0.address(), *self.0.signature()); + SignedMessage::try_from_parts(mapped_receipt, signature, address).map(Into::into) + } + + pub fn try_map_error(self) -> anyhow::Result { + let address = self.0.address(); + let (receipt, signature) = self.0.into_parts(); + + match receipt { + TxReceipt::Promise(_) => bail!( + "SignedTxReceipt::map_error expecting to be call on error variant" + ), + // TODO: optimize me + TxReceipt::Error(err) => { + Ok( + SignedMessage::try_from_parts(TxReceipt::Error(err), signature, address) + .map(Into::into) + .expect("Infallible"), + ) + } + } + } +} + +impl ToDigest for TxReceipt

{ + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + match self { + Self::Promise(promise) => { + hasher.update([0]); + hasher.update(promise.to_digest().0); + } + Self::Error(err) => { + hasher.update([1]); + hasher.update(err.to_digest().0); + } + } + } +} + +/// Represents the reason why [InjectedTransaction] was not included. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)] +#[cfg_attr(feature = "std", derive(serde::Deserialize, serde::Serialize))] +pub struct TransactionError { + pub tx_hash: HashOf, + pub reason: TransactionErrorReason, +} + +impl ToDigest for TransactionError { + fn update_hasher(&self, _hasher: &mut sha3::Keccak256) { + todo!() + } +} + +// TODO: think about creating the general error for `TxValidity` and `ErrorReason` + +/// Reason why transaction was not executed in chain. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode, derive_more::Display)] +#[cfg_attr(feature = "std", derive(serde::Deserialize, serde::Serialize))] +pub enum TransactionErrorReason { + /// Transaction is outdated and can not be included. + #[display("Transaction is oudated")] + Outdated, + + // Important: Keep it in the end of enum. + // In future we will support non zero value injected txs. + #[display("Transaction's value must be zero")] + NonZeroValue, +} + +/// Encoding and decoding of [LimitedVec] as hex string. #[cfg(feature = "std")] mod serde_hex { pub fn serialize( diff --git a/ethexe/consensus/src/announces.rs b/ethexe/consensus/src/announces.rs index 38d2f000523..e651c5248a1 100644 --- a/ethexe/consensus/src/announces.rs +++ b/ethexe/consensus/src/announces.rs @@ -714,9 +714,7 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result { db.set_injected_transaction(tx.clone()); } @@ -724,7 +722,7 @@ pub fn accept_announce(db: &impl DBAnnouncesExt, announce: Announce) -> Result { tracing::trace!( announce = ?announce.to_hash(), - "announce contains invalid transition with status {validity_status:?}, rejecting announce." + "announce contains invalid transition with status {validity:?}, rejecting announce." ); return Ok(AnnounceStatus::Rejected { diff --git a/ethexe/consensus/src/lib.rs b/ethexe/consensus/src/lib.rs index 6ecc5e963dc..11977da6f6f 100644 --- a/ethexe/consensus/src/lib.rs +++ b/ethexe/consensus/src/lib.rs @@ -58,7 +58,7 @@ //! - `ethexe-network` delivers producer announces, validation requests //! and replies, fetched announces and network-forwarded injected //! transactions. Outgoing network messages leave as -//! [`ConsensusEvent::PublishMessage`], [`ConsensusEvent::PublishPromise`] +//! [`ConsensusEvent::PublishMessage`], [`ConsensusEvent::PublishTxReceipt`] //! and [`ConsensusEvent::RequestAnnounces`]. //! - `ethexe-ethereum` is reached only from [`ValidatorService`], through //! the [`BatchCommitter`] trait, to submit aggregated batch @@ -92,8 +92,8 @@ //! |--------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------| //! | [`AnnounceAccepted`](ConsensusEvent::AnnounceAccepted) / [`AnnounceRejected`](ConsensusEvent::AnnounceRejected) | Informational result of validating a received producer announce. | //! | [`ComputeAnnounce`](ConsensusEvent::ComputeAnnounce) | The outer service must hand this announce to `ethexe-compute`, with the given `PromisePolicy`. | -//! | [`PublishMessage`](ConsensusEvent::PublishMessage) | Signed validator-to-validator message to gossip over the network. | -//! | [`PublishPromise`](ConsensusEvent::PublishPromise) | Signed promise to gossip over the network and deliver to RPC subscribers. | +//! | [`PublishMessage`](ConsensusEvent::PublishMessage) | Signed validator-to-validator message to gossip over the network. | +//! | [`PublishTxReceipt`](ConsensusEvent::PublishTxReceipt) | Signed transaction receipt to gossip over the network and deliver to RPC subscribers. | //! | [`RequestAnnounces`](ConsensusEvent::RequestAnnounces) | Ask the network to fetch announces we are missing. | //! | [`CommitmentSubmitted`](ConsensusEvent::CommitmentSubmitted) | Informational: a batch was successfully submitted to the Router contract. | //! | [`Warning`](ConsensusEvent::Warning) | Informational: a non-fatal anomaly (unexpected input, bad reply, etc.) was detected. | @@ -203,7 +203,7 @@ use anyhow::Result; use ethexe_common::{ Announce, Digest, HashOf, PromisePolicy, SimpleBlockData, consensus::{BatchCommitmentValidationReply, VerifiedAnnounce, VerifiedValidationRequest}, - injected::{Promise, SignedCompactPromise, SignedInjectedTransaction}, + injected::{CompactPromise, Promise, SignedInjectedTransaction, SignedTxReceipt}, network::{AnnouncesRequest, AnnouncesResponse, SignedValidatorMessage}, }; use futures::{Stream, stream::FusedStream}; @@ -287,7 +287,9 @@ pub enum ConsensusEvent { #[from] PublishMessage(SignedValidatorMessage), #[from] - PublishPromise(SignedCompactPromise), + PublishTxReceipt(SignedTxReceipt), + // #[from] + // PublishTransactionResult(SignedTransactionResult), /// Outer service have to request announces #[from] RequestAnnounces(AnnouncesRequest), diff --git a/ethexe/consensus/src/tx_validation.rs b/ethexe/consensus/src/tx_validation.rs index e26b6e27afe..267d60bdd85 100644 --- a/ethexe/consensus/src/tx_validation.rs +++ b/ethexe/consensus/src/tx_validation.rs @@ -21,7 +21,9 @@ use ethexe_common::{ Announce, HashOf, ProgramStates, SimpleBlockData, db::{AnnounceStorageRO, GlobalsStorageRO, OnChainStorageRO}, gear::INJECTED_MESSAGE_PANIC_GAS_CHARGE_THRESHOLD, - injected::{InjectedTransaction, SignedInjectedTransaction, VALIDITY_WINDOW}, + injected::{ + InjectedTransaction, SignedInjectedTransaction, TransactionErrorReason, VALIDITY_WINDOW, + }, }; use ethexe_runtime_common::state::Storage; use gprimitives::H256; @@ -38,8 +40,6 @@ pub enum TxValidity { Valid, /// Transaction was already include into one of previous [`VALIDITY_WINDOW`] announces. Duplicate, - /// Transaction is outdated and should be remove from pool. - Outdated, /// Transaction's reference block not on current branch. /// Keep tx in pool in case of reorg. NotOnCurrentBranch, @@ -47,11 +47,10 @@ pub enum TxValidity { UnknownDestination, /// Transaction's destination [`gprimitives::ActorId`] not initialized. UninitializedDestination, - // TODO: #5083 support non zero value transactions. - /// Transaction with non zero value is not supported for now. - NonZeroValue, /// Transaction's destination contract has insufficient balance for injected messages. InsufficientBalanceForInjectedMessages, + /// Transaction must be remove from pool because of [TransactionErrorReason]. + MustRemove(TransactionErrorReason), } pub struct TxValidityChecker { @@ -102,11 +101,11 @@ impl TxVa let reference_block = tx.data().reference_block; if tx.data().value != 0 { - return Ok(TxValidity::NonZeroValue); + return Ok(TxValidity::MustRemove(TransactionErrorReason::NonZeroValue)); } if !self.is_reference_block_within_validity_window(reference_block)? { - return Ok(TxValidity::Outdated); + return Ok(TxValidity::MustRemove(TransactionErrorReason::Outdated)); } if !self.is_reference_block_on_current_branch(reference_block)? { @@ -337,7 +336,7 @@ mod tests { for block in chain.blocks.iter().take(VALIDITY_WINDOW as usize) { let tx = mock_tx(block.hash); assert_eq!( - TxValidity::Outdated, + TxValidity::MustRemove(TransactionErrorReason::Outdated), tx_checker.check_tx_validity(&tx).unwrap() ); } @@ -414,7 +413,7 @@ mod tests { TxValidityChecker::new_for_announce(db, chain_head, announce_hash).unwrap(); assert_eq!( - TxValidity::NonZeroValue, + TxValidity::MustRemove(TransactionErrorReason::NonZeroValue), tx_checker.check_tx_validity(&signed_tx(tx)).unwrap() ); } @@ -432,7 +431,7 @@ mod tests { TxValidityChecker::new_for_announce(db, chain_head, announce_hash).unwrap(); assert_eq!( - TxValidity::Outdated, + TxValidity::MustRemove(TransactionErrorReason::Outdated), tx_checker.check_tx_validity(&signed_tx(tx)).unwrap() ); } diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 5093a92a5cc..6187c7aefd9 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -22,7 +22,7 @@ use super::{ use crate::{ ConsensusEvent, announces::{self, DBAnnouncesExt}, - validator::DefaultProcessing, + validator::{DefaultProcessing, tx_pool::PoolDelta}, }; use anyhow::{Context as _, Result, anyhow}; use derive_more::{Debug, Display}; @@ -30,7 +30,7 @@ use ethexe_common::{ Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, gear::BatchCommitment, - injected::{Promise, SignedCompactPromise}, + injected::{Promise, TxReceipt}, network::ValidatorMessage, }; use ethexe_service_utils::Timer; @@ -117,16 +117,14 @@ impl StateHandler for Producer { State::WaitingAnnounceComputed(expected) if *expected == announce_hash => { let tx_hash = promise.tx_hash; - let signed_promise = - self.ctx - .core - .signer - .signed_message(self.ctx.core.pub_key, promise, None)?; - let compact_signed_promise = - SignedCompactPromise::from_signed_promise(&signed_promise); + let signed_receipt = self.ctx.core.signer.signed_message( + self.ctx.core.pub_key, + TxReceipt::Promise(promise.to_compact()), + None, + )?; self.ctx - .output(ConsensusEvent::PublishPromise(compact_signed_promise)); + .output(ConsensusEvent::PublishTxReceipt(signed_receipt.into())); tracing::trace!("consensus sign promise for transaction-hash={tx_hash}"); Ok(self.into()) @@ -204,12 +202,25 @@ impl Producer { self.ctx.core.commitment_delay_limit, )?; - let injected_transactions = self + let PoolDelta { + selected: injected_transactions, + removed, + } = self .ctx .core .injected_pool .select_for_announce(self.block, parent)?; + for err in removed.into_iter() { + let signed_receipt = self.ctx.core.signer.signed_message( + self.ctx.core.pub_key, + TxReceipt::Error(err), + None, + )?; + self.ctx + .output(ConsensusEvent::PublishTxReceipt(signed_receipt.into())); + } + let announce = Announce { block_hash: self.block.hash, parent, diff --git a/ethexe/consensus/src/validator/tx_pool.rs b/ethexe/consensus/src/validator/tx_pool.rs index 6859361c000..58d74c7afff 100644 --- a/ethexe/consensus/src/validator/tx_pool.rs +++ b/ethexe/consensus/src/validator/tx_pool.rs @@ -23,7 +23,7 @@ use ethexe_common::{ db::{ AnnounceStorageRO, CodesStorageRO, GlobalsStorageRO, InjectedStorageRW, OnChainStorageRO, }, - injected::{InjectedTransaction, SignedInjectedTransaction}, + injected::{InjectedTransaction, SignedInjectedTransaction, TransactionError}, }; use ethexe_db::Database; use ethexe_runtime_common::state::Storage; @@ -43,6 +43,12 @@ pub(crate) struct InjectedTxPool { db: DB, } +#[derive(Default)] +pub(crate) struct PoolDelta { + pub selected: Vec, + pub removed: Vec, +} + impl InjectedTxPool where DB: InjectedStorageRW @@ -76,12 +82,14 @@ where &mut self, block: SimpleBlockData, parent_announce: HashOf, - ) -> Result> { + ) -> Result { tracing::trace!(block = ?block.hash, "start collecting injected transactions"); let tx_checker = TxValidityChecker::new_for_announce(self.db.clone(), block, parent_announce)?; + let mut delta = PoolDelta::default(); + let mut touched_programs = crate::utils::block_touched_programs(&self.db, block.hash)?; if touched_programs.len() > MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE as usize { tracing::error!( @@ -90,11 +98,10 @@ where touched_programs.len(), MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE ); - return Ok(vec![]); + return Ok(delta); } - let mut selected_txs = vec![]; - let mut remove_txs = vec![]; + let mut remove_data = vec![]; let mut size_counter = 0usize; for (reference_block, tx_hash) in self.inner.iter() { @@ -129,7 +136,7 @@ where tracing::trace!(tx_hash = ?tx_hash, tx = ?tx.data(), "tx is valid, including to announce"); touched_programs.insert(program_id); - selected_txs.push(tx); + delta.selected.push(tx); size_counter += tx_size; } TxValidity::Duplicate => { @@ -148,10 +155,6 @@ where // Keep in pool, in case of reorg it can be valid again. tracing::trace!(tx_hash = ?tx_hash, tx = ?tx.data(), "tx is on different branch, keeping in pool"); } - TxValidity::Outdated => { - tracing::trace!(tx_hash = ?tx_hash, tx = ?tx.data(), "tx is outdated, removing from pool"); - remove_txs.push((*reference_block, *tx_hash)) - } TxValidity::UninitializedDestination => { // Keep in pool, in case destination actor gets initialized later. tracing::trace!( @@ -168,22 +171,22 @@ where "tx destination actor has insufficient balance for injected messages, keeping in pool" ); } - TxValidity::NonZeroValue => { - tracing::trace!( - tx_hash = ?tx_hash, - tx = ?tx.data(), - "tx has non-zero value, removing from pool" - ); - remove_txs.push((*reference_block, *tx_hash)) + TxValidity::MustRemove(reason) => { + tracing::trace!(?tx_hash, tx = ?tx.data(), %reason, "transaction removed from pool"); + remove_data.push((*reference_block, *tx_hash)); + delta.removed.push(TransactionError { + tx_hash: *tx_hash, + reason, + }); } } } - remove_txs.into_iter().for_each(|key| { + remove_data.into_iter().for_each(|key| { self.inner.remove(&key); }); - Ok(selected_txs) + Ok(delta) } } @@ -269,14 +272,14 @@ mod tests { .unwrap(), ); - let selected_txs = tx_pool + let delta = tx_pool .select_for_announce( chain.blocks[10].to_simple(), chain.block_top_announce_hash(9), ) .unwrap(); assert_eq!( - selected_txs, + delta.selected, vec![signed_tx], "tx should be selected for announce" ); @@ -359,7 +362,7 @@ mod tests { tx_pool.handle_tx(signed_tx); } - let selected_txs = tx_pool + let delta = tx_pool .select_for_announce( chain.blocks[10].to_simple(), chain.block_top_announce_hash(9), @@ -367,7 +370,7 @@ mod tests { .unwrap(); assert_eq!( - selected_txs.len(), + delta.selected.len(), MAX_TOUCHED_PROGRAMS_PER_ANNOUNCE as usize - 90 ); } diff --git a/ethexe/network/src/gossipsub.rs b/ethexe/network/src/gossipsub.rs index cf1ee3e5bbc..fbac67d2110 100644 --- a/ethexe/network/src/gossipsub.rs +++ b/ethexe/network/src/gossipsub.rs @@ -23,7 +23,11 @@ use crate::{ peer_score, }; use anyhow::anyhow; -use ethexe_common::{Address, injected::SignedCompactPromise, network::SignedValidatorMessage}; +use ethexe_common::{ + Address, + injected::{CompactPromise, SignedTxReceipt}, + network::SignedValidatorMessage, +}; use libp2p::{ core::{Endpoint, transport::PortUse}, gossipsub, @@ -46,21 +50,21 @@ use std::{ pub enum Message { // TODO: rename to `Validators` Commitments(SignedValidatorMessage), - Promise(SignedCompactPromise), + TxReceipt(SignedTxReceipt), } impl Message { fn topic_hash(&self, behaviour: &Behaviour) -> TopicHash { match self { Message::Commitments(_) => behaviour.commitments_topic.hash(), - Message::Promise(_) => behaviour.promises_topic.hash(), + Message::TxReceipt(_) => behaviour.tx_receipts_topic.hash(), } } fn encode(&self) -> Vec { match self { Message::Commitments(message) => message.encode(), - Message::Promise(message) => message.encode(), + Message::TxReceipt(message) => message.encode(), } } } @@ -112,7 +116,7 @@ pub(crate) struct Behaviour { // TODO: consider to limit queue message_queue: VecDeque, commitments_topic: IdentTopic, - promises_topic: IdentTopic, + tx_receipts_topic: IdentTopic, metrics: Arc, } @@ -125,7 +129,7 @@ impl Behaviour { metrics: Arc, ) -> anyhow::Result { let commitments_topic = Self::topic_with_router("commitments", router_address); - let promises_topic = Self::topic_with_router("promises", router_address); + let tx_receipts_topic = Self::topic_with_router("tx_receipts", router_address); let inner = ConfigBuilder::default() // dedup messages @@ -149,14 +153,14 @@ impl Behaviour { .map_err(|e| anyhow!("`gossipsub` scoring parameters error: {e}"))?; inner.subscribe(&commitments_topic)?; - inner.subscribe(&promises_topic)?; + inner.subscribe(&tx_receipts_topic)?; Ok(Self { inner, peer_score, message_queue: VecDeque::new(), commitments_topic, - promises_topic, + tx_receipts_topic, metrics, }) } @@ -189,8 +193,9 @@ impl Behaviour { let res = if topic == self.commitments_topic.hash() { SignedValidatorMessage::decode(&mut &data[..]).map(Message::Commitments) - } else if topic == self.promises_topic.hash() { - SignedCompactPromise::decode(&mut &data[..]).map(Message::Promise) + } else if topic == self.tx_receipts_topic.hash() { + SignedTxReceipt::::decode(&mut &data[..]) + .map(Message::TxReceipt) } else { unreachable!("topic we never subscribed to: {topic:?}"); }; diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index ecf41d69aa3..2596c62479f 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -59,7 +59,7 @@ use ethexe_common::{ Address, BlockHeader, ValidatorsVec, db::ConfigStorageRO, ecdsa::PublicKey, - injected::{AddressedInjectedTransaction, SignedCompactPromise}, + injected::{AddressedInjectedTransaction, CompactPromise, SignedTxReceipt}, network::{SignedValidatorMessage, VerifiedValidatorMessage}, }; use ethexe_db::Database; @@ -110,7 +110,7 @@ pub enum NetworkEvent { /// A validator-signed message from the validator gossipsub topic. ValidatorMessage(VerifiedValidatorMessage), /// A public promise observed on the promise gossipsub topic. - PromiseMessage(SignedCompactPromise), + TxReceiptMessage(SignedTxReceipt), /// Validator discovery learned or refreshed the network identity of the /// given validator address. ValidatorIdentityUpdated(Address), @@ -562,11 +562,11 @@ impl NetworkService { .verify_validator_message(source, message); (acceptance, message.map(NetworkEvent::ValidatorMessage)) } - gossipsub::Message::Promise(compact_promise) => { + gossipsub::Message::TxReceipt(receipt) => { // FIXME: previous era validators are ignored - let (acceptance, promise) = - self.validator_topic.verify_promise(source, compact_promise); - (acceptance, promise.map(NetworkEvent::PromiseMessage)) + let (acceptance, receipt) = + self.validator_topic.verify_receipt(source, receipt); + (acceptance, receipt.map(NetworkEvent::TxReceiptMessage)) } }) } @@ -668,11 +668,8 @@ impl NetworkService { } /// Publish a signed promise to the public promise gossipsub topic. - pub fn publish_promise(&mut self, compact_promise: SignedCompactPromise) { - self.swarm - .behaviour_mut() - .gossipsub - .publish(compact_promise) + pub fn publish_tx_receipt(&mut self, receipt: SignedTxReceipt) { + self.swarm.behaviour_mut().gossipsub.publish(receipt) } } diff --git a/ethexe/network/src/validator/topic.rs b/ethexe/network/src/validator/topic.rs index 52e3b8a302a..d89f41d73e3 100644 --- a/ethexe/network/src/validator/topic.rs +++ b/ethexe/network/src/validator/topic.rs @@ -25,7 +25,7 @@ use crate::{ }; use ethexe_common::{ Address, HashOf, - injected::{InjectedTransaction, SignedCompactPromise}, + injected::{CompactPromise, InjectedTransaction, SignedTxReceipt}, network::VerifiedValidatorMessage, }; use lru::LruCache; @@ -95,7 +95,7 @@ enum VerifyMessageError { } #[derive(Debug, PartialEq, Eq, derive_more::Display)] -enum VerifyPromiseError { +enum VerifyTxReceiptError { #[display("unknown validator: address={address}, tx_hash={tx_hash}")] UnknownValidator { address: Address, @@ -290,27 +290,27 @@ impl ValidatorTopic { fn inner_verify_promise( &self, _source: PeerId, - compact_promise: SignedCompactPromise, - ) -> Result { - let address = compact_promise.address(); + receipt: SignedTxReceipt, + ) -> Result, VerifyTxReceiptError> { + let address = receipt.address(); if !self.snapshot.contains(address) { - return Err(VerifyPromiseError::UnknownValidator { + return Err(VerifyTxReceiptError::UnknownValidator { address, - tx_hash: compact_promise.data().tx_hash, + tx_hash: receipt.data().tx_hash(), }); } - Ok(compact_promise) + Ok(receipt) } // FIXME: messages from previous era validators are ignored - pub fn verify_promise( + pub fn verify_receipt( &self, source: PeerId, - compact_promise: SignedCompactPromise, - ) -> (MessageAcceptance, Option) { - match self.inner_verify_promise(source, compact_promise) { - Ok(compact_promise) => (MessageAcceptance::Accept, Some(compact_promise)), + receipt: SignedTxReceipt, + ) -> (MessageAcceptance, Option>) { + match self.inner_verify_promise(source, receipt) { + Ok(receipt) => (MessageAcceptance::Accept, Some(receipt)), Err(err) => { log::trace!("failed to verify compact promise: {err}"); (MessageAcceptance::Ignore, None) @@ -332,7 +332,7 @@ mod tests { use ethexe_common::{ Announce, HashOf, ecdsa::{PublicKey, SignedData}, - injected::{Promise, SignedCompactPromise, SignedPromise}, + injected::{Promise, SignedPromise, TxReceipt}, mock::Mock, network::{SignedValidatorMessage, ValidatorMessage}, }; @@ -389,13 +389,15 @@ mod tests { signer.signed_message(public_key, promise, None).unwrap() } - fn compact_signed_promise( + fn signed_promise_receipt( signer: &Signer, public_key: PublicKey, promise: Promise, - ) -> SignedCompactPromise { - let signed_promise = signer.signed_message(public_key, promise, None).unwrap(); - SignedCompactPromise::from_signed_promise(&signed_promise) + ) -> SignedTxReceipt { + signer + .signed_message(public_key, TxReceipt::Promise(promise.to_compact()), None) + .unwrap() + .into() } fn test_announce() -> Announce { @@ -762,38 +764,38 @@ mod tests { let (pubkey, signer) = signer_with_pubkey(); let promise = signed_promise(signer.clone(), pubkey); - let compact_promise = compact_signed_promise(&signer, pubkey, promise.clone().into_data()); + let receipt = signed_promise_receipt(&signer, pubkey, promise.clone().into_data()); let peer_id = PeerId::random(); let err = topic - .inner_verify_promise(peer_id, compact_promise.clone()) + .inner_verify_promise(peer_id, receipt.clone()) .unwrap_err(); - let VerifyPromiseError::UnknownValidator { address, tx_hash } = err; - assert_eq!(address, promise.address()); - assert_eq!(tx_hash, promise.data().tx_hash); + let VerifyTxReceiptError::UnknownValidator { address, tx_hash } = err; + assert_eq!(address, receipt.address()); + assert_eq!(tx_hash, receipt.data().tx_hash()); - let (acceptance, promise) = topic.verify_promise(peer_id, compact_promise); + let (acceptance, receipt) = topic.verify_receipt(peer_id, receipt); assert_matches!(acceptance, MessageAcceptance::Ignore); - assert_eq!(promise, None); + assert_eq!(receipt, None); } #[tokio::test] async fn verify_promise_ok() { let (pubkey, signer) = signer_with_pubkey(); let promise = signed_promise(signer.clone(), pubkey); - let compact_promise = compact_signed_promise(&signer, pubkey, promise.clone().into_data()); + let receipt = signed_promise_receipt(&signer, pubkey, promise.clone().into_data()); - let topic = new_topic(nonempty![promise.address()]); + let topic = new_topic(nonempty![receipt.address()]); let peer_id = PeerId::random(); topic - .inner_verify_promise(peer_id, compact_promise.clone()) + .inner_verify_promise(peer_id, receipt.clone()) .unwrap(); - let (acceptance, returned_promise) = topic.verify_promise(peer_id, compact_promise.clone()); + let (acceptance, returned_receipt) = topic.verify_receipt(peer_id, receipt.clone()); assert_matches!(acceptance, MessageAcceptance::Accept); - assert_eq!(returned_promise, Some(compact_promise)); + assert_eq!(returned_receipt, Some(receipt)); } } diff --git a/ethexe/rpc/src/apis/injected/promise_manager.rs b/ethexe/rpc/src/apis/injected/promise_manager.rs index 31355e0bc41..4ca5d9a0545 100644 --- a/ethexe/rpc/src/apis/injected/promise_manager.rs +++ b/ethexe/rpc/src/apis/injected/promise_manager.rs @@ -21,7 +21,7 @@ use dashmap::{DashMap, mapref::entry::Entry}; use ethexe_common::{ HashOf, db::{InjectedStorageRO, InjectedStorageRW}, - injected::{InjectedTransaction, Promise, SignedCompactPromise, SignedPromise}, + injected::{CompactPromise, InjectedTransaction, Promise, SignedTxReceipt}, }; use ethexe_db::Database; use std::sync::Arc; @@ -29,8 +29,10 @@ use tokio::sync::oneshot; use tracing::{trace, warn}; // TODO: Issues #5384 and #5385. -type PromiseSubscribers = Arc, oneshot::Sender>>; -type PromisesComputationWaiting = Arc, SignedCompactPromise>>; +type PromiseSubscribers = + Arc, oneshot::Sender>>; +type ReceiptsComputationWaiting = + Arc, SignedTxReceipt>>; /// The manager for promise subscribers. #[derive(Debug, Clone)] @@ -38,7 +40,7 @@ pub struct PromiseSubscriptionManager { db: Database, subscribers: PromiseSubscribers, - waiting_for_compute: PromisesComputationWaiting, + waiting_for_compute: ReceiptsComputationWaiting, } #[derive(Debug, Clone, thiserror::Error)] @@ -47,16 +49,16 @@ pub enum RegisterSubscriberError { AlreadyRegistered(HashOf), } -type TimeoutReceiver = tokio::time::Timeout>; +type TimeoutReceiver = tokio::time::Timeout>; -/// The pending [SignedPromise] subscriber. +/// The pending [SignedTxReceipt] subscriber. /// Subscriber will be spawned in separate tokio runtime task and will wait for promise. /// /// Important: to avoid infinite waiting we wrap [oneshot::Receiver] into [tokio::time::timeout]. pub struct PendingSubscriber { /// Tx hash waiting promise for. tx_hash: HashOf, - /// Wrapped promise [oneshot::Receiver]. + /// Wrapped tx receipt [oneshot::Receiver]. receiver: TimeoutReceiver, } @@ -64,7 +66,7 @@ impl PendingSubscriber { pub fn new( db: &Database, tx_hash: HashOf, - receiver: oneshot::Receiver, + receiver: oneshot::Receiver, ) -> Self { let timeout_duration = utils::promise_waiting_timeout(db); let receiver = tokio::time::timeout(timeout_duration, receiver); @@ -81,7 +83,7 @@ impl PromiseSubscriptionManager { Self { db, subscribers: PromiseSubscribers::default(), - waiting_for_compute: PromisesComputationWaiting::default(), + waiting_for_compute: ReceiptsComputationWaiting::default(), } } @@ -103,32 +105,36 @@ impl PromiseSubscriptionManager { pub fn cancel_registration( &self, tx_hash: HashOf, - ) -> Option> { + ) -> Option> { self.subscribers.remove(&tx_hash).map(|(_, v)| v) } // TODO: Issue #5403 - pub fn on_compact_promise(&self, compact: SignedCompactPromise) { - trace!(?compact, "received new compact promise"); - let tx_hash = compact.data().tx_hash; + pub fn on_tx_receipt(&self, receipt: SignedTxReceipt) { + trace!(?receipt, "received new compact promise"); + if receipt.data().is_error() { + self.dispatch_receipt(receipt.try_map_error().expect("infallible")); + return; + } + let tx_hash = receipt.data().tx_hash(); match self.db.promise(tx_hash) { - Some(promise) => match compact.restore(promise) { - Ok(signed_promise) => { - self.db.set_compact_promise(&compact); - self.dispatch_promise(signed_promise); + Some(promise) => match receipt.try_map_promise(promise) { + Ok(signed_receipt) => { + // self.db.set_compact_promise(&compact); + self.dispatch_receipt(signed_receipt); } Err(err) => { warn!( - ?compact, %tx_hash, error=?err, "failed to create signed promise from parts, producer send invalid signature: compact_promise={compact:?}" + ?receipt, %tx_hash, error=?err, "failed to create signed receipt from parts, producer send invalid signature" ); - self.waiting_for_compute.insert(tx_hash, compact); + self.waiting_for_compute.insert(tx_hash, receipt); } }, None => { trace!("not found promise in database, waiting for computation..."); - self.waiting_for_compute.insert(tx_hash, compact); + self.waiting_for_compute.insert(tx_hash, receipt); } } } @@ -137,24 +143,24 @@ impl PromiseSubscriptionManager { trace!(?promise, "received new computed promise"); self.db.set_promise(&promise); - if let Some((_, compact_promise)) = self.waiting_for_compute.remove(&promise.tx_hash) { - match compact_promise.restore(promise) { - Ok(signed_promise) => { - self.db.set_compact_promise(&compact_promise); - self.dispatch_promise(signed_promise); + if let Some((_, receipt)) = self.waiting_for_compute.remove(&promise.tx_hash) { + match receipt.try_map_promise(promise) { + Ok(signed_receipt) => { + // self.db.set_compact_promise(&compact_promise); + self.dispatch_receipt(signed_receipt); } Err(_err) => { - trace!(?compact_promise, tx_hash=?compact_promise.data().tx_hash, "failed to create signed promise from parts"); + trace!(?receipt, tx_hash=?receipt.data().tx_hash(), "failed to create signed promise from parts"); } } } } - fn dispatch_promise(&self, promise: SignedPromise) { - if let Some((_, sender)) = self.subscribers.remove(&promise.data().tx_hash) - && let Err(unsent_promise) = sender.send(promise) + fn dispatch_receipt(&self, receipt: SignedTxReceipt) { + if let Some((_, sender)) = self.subscribers.remove(&receipt.data().tx_hash()) + && let Err(unsent_receipt) = sender.send(receipt) { - trace!("failed to send promise to subscriber, promise={unsent_promise:?}"); + trace!("failed to send receipt to subscriber, receipt={unsent_receipt:?}"); } } diff --git a/ethexe/rpc/src/apis/injected/spawner.rs b/ethexe/rpc/src/apis/injected/spawner.rs index c0d2026647c..eea8b88770b 100644 --- a/ethexe/rpc/src/apis/injected/spawner.rs +++ b/ethexe/rpc/src/apis/injected/spawner.rs @@ -37,10 +37,10 @@ pub fn spawn_pending_subscriber( let _guard = scopeguard::guard(tx_hash, on_finish); // Waiting for the first one: promise, timeout_err, client disconnect error. - let promise = tokio::select! { + let receipt = tokio::select! { result = receiver => match result { - Ok(promise_result) => match promise_result { - Ok(promise) => promise, + Ok(receipt_result) => match receipt_result { + Ok(receipt) => receipt, Err(_err) => { unreachable!("promise sender is owned by the server; it cannot be dropped before this point"); } @@ -56,7 +56,7 @@ pub fn spawn_pending_subscriber( } }; - match SubscriptionMessage::from_json(&promise) { + match SubscriptionMessage::from_json(&receipt) { Ok(message) => { if let Err(err) = sink.send(message).await { trace!("failed to send promise, client disconnected: err={err}"); @@ -64,7 +64,7 @@ pub fn spawn_pending_subscriber( } Err(err) => { error!( - ?promise, + ?receipt, ?err, "serialization error: failed create `SubscriptionMessage` from promise; this must never happen" ); diff --git a/ethexe/rpc/src/apis/injected/trait.rs b/ethexe/rpc/src/apis/injected/trait.rs index 8d3ad600a61..7974d4ba59a 100644 --- a/ethexe/rpc/src/apis/injected/trait.rs +++ b/ethexe/rpc/src/apis/injected/trait.rs @@ -20,7 +20,7 @@ use ethexe_common::{ HashOf, injected::{ AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, - SignedInjectedTransaction, SignedPromise, + SignedInjectedTransaction, SignedPromise, SignedTxReceipt, }, }; use jsonrpsee::{ @@ -42,7 +42,7 @@ pub trait Injected { #[subscription( name = "sendTransactionAndWatch", unsubscribe = "sendTransactionAndWatchUnsubscribe", - item = SignedPromise + item = SignedTxReceipt )] async fn send_transaction_and_watch( &self, diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 38c50678a70..b57ab9e247d 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -58,7 +58,8 @@ use apis::{ ProgramApi, ProgramServer, }; use ethexe_common::injected::{ - AddressedInjectedTransaction, InjectedTransactionAcceptance, Promise, SignedCompactPromise, + AddressedInjectedTransaction, CompactPromise, InjectedTransactionAcceptance, Promise, + SignedTxReceipt, }; use ethexe_db::Database; use ethexe_processor::{Processor, ProcessorConfig}; @@ -196,8 +197,8 @@ impl RpcService { self.injected_api.on_computed_promise(promise); } - pub fn receive_compact_promise(&self, compact_promise: SignedCompactPromise) { - self.injected_api.on_compact_promise(compact_promise); + pub fn receive_tx_receipt(&self, receipt: SignedTxReceipt) { + self.injected_api.on_tx_receipt(receipt); } } diff --git a/ethexe/rpc/src/tests.rs b/ethexe/rpc/src/tests.rs index 1bab4b43134..c595b4c83dc 100644 --- a/ethexe/rpc/src/tests.rs +++ b/ethexe/rpc/src/tests.rs @@ -21,10 +21,11 @@ use crate::{ RpcService, }; use ethexe_common::{ + SignedMessage, db::InjectedStorageRW, ecdsa::PrivateKey, gear::MAX_BLOCK_GAS_LIMIT, - injected::{AddressedInjectedTransaction, Promise, SignedCompactPromise}, + injected::{AddressedInjectedTransaction, CompactPromise, Promise, SignedTxReceipt, TxReceipt}, mock::Mock, }; use ethexe_db::Database; @@ -66,9 +67,9 @@ impl MockService { loop { tokio::select! { _ = tx_batch_interval.tick() => { - let promises = self.promises_bundle(tx_batch.drain(..)); - promises.into_iter().for_each(|promise| { - self.rpc.receive_compact_promise(promise); + let receipts = self.receipts_bundle(tx_batch.drain(..)); + receipts.into_iter().for_each(|receipt| { + self.rpc.receive_tx_receipt(receipt); }); }, _ = self.handle.clone().stopped() => { @@ -85,16 +86,17 @@ impl MockService { }) } - fn promises_bundle( + fn receipts_bundle( &self, txs: impl IntoIterator, - ) -> Vec { + ) -> Vec> { let pk = PrivateKey::random(); txs.into_iter() .map(|tx| { let promise = Promise::mock(tx.tx.data().to_hash()); + let receipt = TxReceipt::Promise(promise.to_compact()); self.db.set_promise(&promise); - SignedCompactPromise::create_from_promise(pk.clone(), &promise).unwrap() + SignedMessage::create(pk.clone(), receipt).unwrap().into() }) .collect() } @@ -149,14 +151,15 @@ async fn test_cleanup_promise_subscribers() { .expect("Subscription will be created"); subscribers.spawn(async move { - let promise = sub + let receipt = sub .next() .await .expect("Promise will be received") .expect("No error in subscription result"); + let promise = receipt.data().clone().unwrap_promise(); assert_eq!( - promise.data().reply.code, + promise.reply.code, ReplyCode::Success(SuccessReplyReason::Manual) ); @@ -177,14 +180,15 @@ async fn test_cleanup_promise_subscribers() { .expect("Subscription will be created"); subscribers.spawn(async move { - let promise = subscription + let receipt = subscription .next() .await .expect("Promise will be received") .expect("No error in subscription result"); + let promise = receipt.data().clone().unwrap_promise(); assert_eq!( - promise.data().reply.code, + promise.reply.code, ReplyCode::Success(SuccessReplyReason::Manual) ); }); @@ -240,14 +244,15 @@ async fn test_concurrent_multiple_clients() { .await .expect("Subscription will be created"); - let promise = subscription + let receipt = subscription .next() .await .expect("Promise will be received") .expect("No error in subscription result"); + let promise = receipt.data().clone().unwrap_promise(); assert_eq!( - promise.data().reply.code, + promise.reply.code, ReplyCode::Success(SuccessReplyReason::Manual) ); diff --git a/ethexe/sdk/src/mirror.rs b/ethexe/sdk/src/mirror.rs index e9fbe339c20..8b455457b13 100644 --- a/ethexe/sdk/src/mirror.rs +++ b/ethexe/sdk/src/mirror.rs @@ -24,6 +24,7 @@ use ethexe_common::{ gear_core::rpc::ReplyInfo, injected::{ AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, Promise, + TxReceipt, }, }; use ethexe_ethereum::{ @@ -261,12 +262,19 @@ impl<'a> Mirror<'a> { .await .with_context(|| "failed to send injected transaction and subscribe to it's promise")?; - let promise = subscription + let receipt = subscription .next() .await .ok_or_else(|| anyhow!("no promise received from subscription"))? .with_context(|| "failed to receive transaction promise")? - .into_data(); + .data() + .clone(); + let promise = match receipt { + TxReceipt::Promise(promise) => promise, + TxReceipt::Error(err) => { + return Err(anyhow!("injected transaction failed: {err:?}")); + } + }; Ok((message_id, promise)) } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 459809b9ba1..8260308e281 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -699,9 +699,9 @@ impl Service { let _res = response_sender.send(acceptance); } }, - NetworkEvent::PromiseMessage(compact_promise) => { + NetworkEvent::TxReceiptMessage(receipt) => { if let Some(rpc) = &rpc { - rpc.receive_compact_promise(compact_promise); + rpc.receive_tx_receipt(receipt); } } NetworkEvent::ValidatorIdentityUpdated(_) @@ -749,17 +749,17 @@ impl Service { ConsensusEvent::ComputeAnnounce(announce, promise_policy) => { compute.compute_announce(announce, promise_policy) } - ConsensusEvent::PublishPromise(compact_promise) => { + ConsensusEvent::PublishTxReceipt(receipt) => { if rpc.is_none() && network.is_none() { panic!("Promise without network or rpc"); } if let Some(rpc) = &rpc { - rpc.receive_compact_promise(compact_promise.clone()); + rpc.receive_tx_receipt(receipt.clone()); } if let Some(network) = &mut network { - network.publish_promise(compact_promise); + network.publish_tx_receipt(receipt); } } ConsensusEvent::PublishMessage(message) => { diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 80c73020197..84cb6232cf1 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -2768,19 +2768,25 @@ async fn injected_tx_fungible_token() { .await; tracing::info!("✅ Tokens mint successfully"); - let subscription_promise = subscription + let subscription_receipt = subscription .next() .await .expect("subscription produce value") .expect("no errors for correct injected transaction"); - assert_eq!(subscription_promise.data().tx_hash, mint_tx.to_hash()); - assert_eq!(subscription_promise.data().reply.value, 0); + assert_eq!(subscription_receipt.data().tx_hash(), mint_tx.to_hash()); + let subscription_promise = subscription_receipt.data().clone().unwrap_promise(); + assert_eq!(subscription_promise.reply.value, 0); assert_eq!( - subscription_promise.data().reply.code, + subscription_promise.reply.code, ReplyCode::Success(SuccessReplyReason::Manual) ); assert_eq!( - subscription_promise.into_data().reply.payload, + subscription_receipt + .data() + .clone() + .unwrap_promise() + .reply + .payload, expected_event.encode() ); @@ -2851,7 +2857,9 @@ async fn injected_tx_fungible_token() { .await .expect("promise from subscription") .expect("transaction promise") - .into_data(); + .data() + .clone() + .unwrap_promise(); assert_eq!(promise.tx_hash, transfer_tx.to_hash()); @@ -3009,7 +3017,9 @@ async fn injected_tx_fungible_token_over_network() { .await .expect("promise from subscription") .expect("transaction promise") - .into_data(); + .data() + .clone() + .unwrap_promise(); let expected_event = demo_fungible_token::FTEvent::Transfer { from: ActorId::new([0u8; 32]), diff --git a/ethexe/service/src/tests/utils/events.rs b/ethexe/service/src/tests/utils/events.rs index 8a0d6d0cf51..90ef00a51ee 100644 --- a/ethexe/service/src/tests/utils/events.rs +++ b/ethexe/service/src/tests/utils/events.rs @@ -26,8 +26,8 @@ use ethexe_common::{ db::*, events::BlockEvent, injected::{ - AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, - SignedCompactPromise, SignedInjectedTransaction, + AddressedInjectedTransaction, CompactPromise, InjectedTransaction, + InjectedTransactionAcceptance, SignedInjectedTransaction, SignedTxReceipt, }, network::VerifiedValidatorMessage, }; @@ -85,7 +85,7 @@ impl TestingNetworkInjectedEvent { #[derive(Debug, Clone, Eq, PartialEq)] pub enum TestingNetworkEvent { ValidatorMessage(VerifiedValidatorMessage), - PromiseMessage(SignedCompactPromise), + TxReceiptMessage(SignedTxReceipt), ValidatorIdentityUpdated(Address), InjectedTransaction(TestingNetworkInjectedEvent), PeerBlocked(PeerId), @@ -96,7 +96,7 @@ impl TestingNetworkEvent { fn new(event: &NetworkEvent) -> Self { match event { NetworkEvent::ValidatorMessage(message) => Self::ValidatorMessage(message.clone()), - NetworkEvent::PromiseMessage(message) => Self::PromiseMessage(message.clone()), + NetworkEvent::TxReceiptMessage(message) => Self::TxReceiptMessage(message.clone()), NetworkEvent::ValidatorIdentityUpdated(address) => { Self::ValidatorIdentityUpdated(*address) }