diff --git a/Cargo.lock b/Cargo.lock index df014206..3e3e5c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10815,6 +10815,8 @@ dependencies = [ "alloy-provider", "alloy-rpc-types-engine 1.0.24", "futures", + "metrics", + "metrics-derive", "reth-chainspec", "reth-network", "reth-network-api", diff --git a/crates/manager/Cargo.toml b/crates/manager/Cargo.toml index 4341fd0e..f14e2e97 100644 --- a/crates/manager/Cargo.toml +++ b/crates/manager/Cargo.toml @@ -56,6 +56,8 @@ futures.workspace = true tokio-stream.workspace = true tokio.workspace = true tracing.workspace = true +metrics.workspace = true +metrics-derive.workspace = true [dev-dependencies] alloy-consensus.workspace = true diff --git a/crates/manager/src/consensus.rs b/crates/manager/src/consensus.rs index d07f9292..5487dabf 100644 --- a/crates/manager/src/consensus.rs +++ b/crates/manager/src/consensus.rs @@ -1,4 +1,6 @@ use alloy_primitives::{Address, Signature}; +use metrics::Counter; +use metrics_derive::Metrics; use reth_primitives_traits::GotExpected; use reth_scroll_primitives::ScrollBlock; use rollup_node_primitives::{sig_encode_hash, ConsensusUpdate}; @@ -40,17 +42,28 @@ impl Consensus for NoopConsensus { } } +/// The metrics for the [`SystemContractConsensus`]. +#[derive(Metrics, Clone)] +#[metrics(scope = "consensus")] +pub(crate) struct SystemContractConsensusMetrics { + /// System contract validate new block failed counter. + pub validate_new_block_failed: Counter, +} + /// The system contract consensus. #[derive(Debug)] pub struct SystemContractConsensus { authorized_signer: Address, + + /// The metrics for the [`SystemContractConsensus`]. + metrics: SystemContractConsensusMetrics, } impl SystemContractConsensus { /// Creates a new [`SystemContractConsensus`] consensus instance with the given authorized /// signers. - pub const fn new(authorized_signer: Address) -> Self { - Self { authorized_signer } + pub fn new(authorized_signer: Address) -> Self { + Self { authorized_signer, metrics: SystemContractConsensusMetrics::default() } } } @@ -70,6 +83,7 @@ impl Consensus for SystemContractConsensus { let signer = reth_primitives_traits::crypto::secp256k1::recover_signer(signature, hash)?; if self.authorized_signer != signer { + self.metrics.validate_new_block_failed.increment(1); return Err(ConsensusError::IncorrectSigner(GotExpected { got: signer, expected: self.authorized_signer, diff --git a/crates/manager/src/manager/handle.rs b/crates/manager/src/manager/handle.rs index 651ce436..66ff876f 100644 --- a/crates/manager/src/manager/handle.rs +++ b/crates/manager/src/manager/handle.rs @@ -1,27 +1,33 @@ use super::{RollupManagerCommand, RollupManagerEvent}; +use crate::manager::metrics::HandleMetrics; use reth_network_api::FullNetwork; use reth_scroll_node::ScrollNetworkPrimitives; use reth_tokio_util::EventStream; use rollup_node_primitives::BlockInfo; use scroll_network::ScrollNetworkHandle; use tokio::sync::{mpsc, oneshot}; +use tracing::error; /// The handle used to send commands to the rollup manager. #[derive(Debug, Clone)] pub struct RollupManagerHandle> { /// The channel used to send commands to the rollup manager. to_manager_tx: mpsc::Sender>, + handle_metrics: HandleMetrics, } impl> RollupManagerHandle { /// Create a new rollup manager handle. - pub const fn new(to_manager_tx: mpsc::Sender>) -> Self { - Self { to_manager_tx } + pub fn new(to_manager_tx: mpsc::Sender>) -> Self { + Self { to_manager_tx, handle_metrics: HandleMetrics::default() } } /// Sends a command to the rollup manager. pub async fn send_command(&self, command: RollupManagerCommand) { - let _ = self.to_manager_tx.send(command).await; + if let Err(err) = self.to_manager_tx.send(command).await { + self.handle_metrics.handle_send_command_failed.increment(1); + error!(target: "rollup::manager::handle", "Failed to send command to rollup manager: {}", err); + } } /// Sends a command to the rollup manager to build a block. diff --git a/crates/manager/src/manager/metrics.rs b/crates/manager/src/manager/metrics.rs new file mode 100644 index 00000000..1e465caf --- /dev/null +++ b/crates/manager/src/manager/metrics.rs @@ -0,0 +1,48 @@ +use metrics::{Counter, Gauge}; +use metrics_derive::Metrics; + +/// The metrics for the [`super::RollupManagerHandle`]. +#[derive(Metrics, Clone)] +#[metrics(scope = "NodeManager")] +pub(crate) struct HandleMetrics { + /// Failed to send command to rollup manager from handle counter. + pub handle_send_command_failed: Counter, +} + +/// The metrics for the [`super::RollupNodeManager`]. +#[derive(Metrics, Clone)] +#[metrics(scope = "NodeManager")] +pub(crate) struct RollupNodeManagerMetrics { + /// Manager received and handle rollup manager command counter. + pub handle_rollup_manager_command: Counter, + /// Manager received and handle engine driver event counter. + pub handle_engine_driver_event: Counter, + /// Manager received and handle new block produced counter. + pub handle_new_block_produced: Counter, + /// Manager received and handle l1 notification counter. + pub handle_l1_notification: Counter, + /// Manager received and handle chain orchestrator event counter. + pub handle_chain_orchestrator_event: Counter, + /// Manager received and handle signer event counter. + pub handle_signer_event: Counter, + /// Manager received and handle build new payload counter. + pub handle_build_new_payload: Counter, + /// Manager received and handle l1 consolidation counter. + pub handle_l1_consolidation: Counter, + /// Manager received and handle network manager event counter. + pub handle_network_manager_event: Counter, + /// Manager finalized batch index gauge. + pub handle_finalized_batch_index: Gauge, + /// Manager l1 finalized block number gauge. + pub handle_l1_finalized_block_number: Gauge, + /// Manager L1 reorg L1 block number gauge. + pub handle_l1_reorg_l1_block_number: Gauge, + /// Manager L1 reorg L2 head block number gauge. + pub handle_l1_reorg_l2_head_block_number: Gauge, + /// Manager L1 reorg L2 safe block number gauge. + pub handle_l1_reorg_l2_safe_block_number: Gauge, + /// Manager chain import block number gauge. + pub handle_chain_import_block_number: Gauge, + /// Manager optimistic syncing block number gauge. + pub handle_optimistic_syncing_block_number: Gauge, +} diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 6f16d5aa..9dd9108d 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -4,6 +4,7 @@ use super::Consensus; use crate::poll_nested_stream_with_budget; +use ::metrics::counter; use alloy_provider::Provider; use futures::StreamExt; use reth_chainspec::EthChainSpec; @@ -54,6 +55,9 @@ mod event; pub use event::RollupManagerEvent; mod handle; +mod metrics; + +use crate::manager::metrics::RollupNodeManagerMetrics; pub use handle::RollupManagerHandle; /// The size of the event channel. @@ -114,6 +118,8 @@ pub struct RollupNodeManager< block_building_trigger: Option, /// The original block time configuration for restoring automatic sequencing. block_time_config: Option, + // metrics for the rollup node manager. + metrics: RollupNodeManagerMetrics, } /// The current status of the rollup manager. @@ -200,6 +206,7 @@ where None }, block_time_config: block_time, + metrics: RollupNodeManagerMetrics::default(), }; (rnm, RollupManagerHandle::new(handle_tx)) } @@ -283,10 +290,12 @@ where // Remove once we implement issue #273. // Update the derivation pipeline on new finalized batch. for batch_info in finalized_batches { + self.metrics.handle_finalized_batch_index.set(batch_info.index as f64); self.derivation_pipeline.push_batch(batch_info, block_number); } } ChainOrchestratorEvent::L1BlockFinalized(l1_block_number, finalized_batches, ..) => { + self.metrics.handle_l1_finalized_block_number.set(l1_block_number as f64); // update the sequencer's l1 finalized block number. if let Some(sequencer) = self.sequencer.as_mut() { sequencer.set_l1_finalized_block_number(l1_block_number); @@ -308,6 +317,14 @@ where l2_head_block_info, l2_safe_block_info, } => { + self.metrics.handle_l1_reorg_l1_block_number.set(l1_block_number as f64); + self.metrics + .handle_l1_reorg_l2_head_block_number + .set(l2_head_block_info.as_ref().map_or(0, |info| info.number) as f64); + self.metrics + .handle_l1_reorg_l2_safe_block_number + .set(l2_safe_block_info.as_ref().map_or(0, |info| info.number) as f64); + // Handle the reorg in the engine driver. self.engine.handle_l1_reorg( l1_block_number, @@ -328,11 +345,17 @@ where } } ChainOrchestratorEvent::ChainExtended(chain_import) => { + self.metrics + .handle_chain_import_block_number + .set(chain_import.chain.last().unwrap().number as f64); trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header.clone(), peer_id = ?chain_import.peer_id.clone(), "Received chain extension from peer"); // Issue the new chain to the engine driver for processing. self.engine.handle_chain_import(chain_import) } ChainOrchestratorEvent::ChainReorged(chain_import) => { + self.metrics + .handle_chain_import_block_number + .set(chain_import.chain.last().unwrap().number as f64); trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header, ?chain_import.peer_id, "Received chain reorg from peer"); // Issue the new chain to the engine driver for processing. @@ -342,6 +365,8 @@ where let block_info: BlockInfo = (&block).into(); trace!(target: "scroll::node::manager", ?block_info, "Received optimistic sync from peer"); + self.metrics.handle_optimistic_syncing_block_number.set(block_info.number as f64); + // Issue the new block info to the engine driver for processing. self.engine.handle_optimistic_sync(block_info) } @@ -360,6 +385,12 @@ where match err { ChainOrchestratorError::L1MessageMismatch { expected, actual } => { + counter!( + "manager_handle_chain_orchestrator_event_failed", + "type" => "l1_message_mismatch", + ) + .increment(1); + if let Some(event_sender) = self.event_sender.as_ref() { event_sender.notify(RollupManagerEvent::L1MessageConsolidationError { expected: *expected, @@ -368,6 +399,12 @@ where } } ChainOrchestratorError::DatabaseError(DatabaseError::L1MessageNotFound(start)) => { + counter!( + "manager_handle_chain_orchestrator_event_failed", + "type" => "l1_message_not_found", + ) + .increment(1); + if let Some(event_sender) = self.event_sender.as_ref() { event_sender.notify(RollupManagerEvent::L1MessageMissingInDatabase { start: start.clone(), @@ -428,6 +465,7 @@ where /// Handles an [`L1Notification`] from the L1 watcher. fn handle_l1_notification(&mut self, notification: L1Notification) { + self.metrics.handle_l1_notification.increment(1); if let Some(event_sender) = self.event_sender.as_ref() { event_sender.notify(RollupManagerEvent::L1NotificationEvent(notification.clone())); } @@ -499,6 +537,7 @@ where // Poll the handle receiver for commands. while let Poll::Ready(Some(command)) = this.handle_rx.poll_recv(cx) { + this.metrics.handle_rollup_manager_command.increment(1); match command { RollupManagerCommand::BuildBlock => { proceed_if!( @@ -550,6 +589,7 @@ where // Drain all EngineDriver events. while let Poll::Ready(Some(event)) = this.engine.poll_next_unpin(cx) { + this.metrics.handle_engine_driver_event.increment(1); this.handle_engine_driver_event(event); } @@ -559,6 +599,7 @@ where if let Some(Poll::Ready(Some(attributes))) = this.sequencer.as_mut().map(|x| x.poll_next_unpin(cx)) { + this.metrics.handle_new_block_produced.increment(1); this.engine.handle_build_new_payload(attributes); } ); @@ -580,6 +621,7 @@ where // Drain all chain orchestrator events. while let Poll::Ready(Some(result)) = this.chain.poll_next_unpin(cx) { + this.metrics.handle_chain_orchestrator_event.increment(1); match result { Ok(event) => this.handle_chain_orchestrator_event(event), Err(err) => { @@ -592,6 +634,7 @@ where while let Some(Poll::Ready(Some(event))) = this.signer.as_mut().map(|s| s.poll_next_unpin(cx)) { + this.metrics.handle_signer_event.increment(1); match event { SignerEvent::SignedBlock { block, signature } => { trace!(target: "scroll::node::manager", ?block, ?signature, "Received signed block from signer, announcing to the network"); @@ -619,6 +662,7 @@ where this.block_building_trigger.as_mut().map(|trigger| trigger.poll_tick(cx)), this.sequencer.as_mut() ) { + this.metrics.handle_build_new_payload.increment(1); if !this.consensus.should_sequence_block( this.signer .as_ref() @@ -636,11 +680,13 @@ where // Poll Derivation Pipeline and push attribute in queue if any. while let Poll::Ready(Some(attributes)) = this.derivation_pipeline.poll_next_unpin(cx) { + this.metrics.handle_l1_consolidation.increment(1); this.engine.handle_l1_consolidation(attributes) } // Handle network manager events. while let Poll::Ready(Some(event)) = this.network.poll_next_unpin(cx) { + this.metrics.handle_network_manager_event.increment(1); this.handle_network_manager_event(event); }