Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ futures.workspace = true
tokio-stream.workspace = true
tokio.workspace = true
tracing.workspace = true
metrics.workspace = true
metrics-derive.workspace = true

[dev-dependencies]
alloy-consensus.workspace = true
Expand Down
18 changes: 16 additions & 2 deletions crates/manager/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use alloy_primitives::{Address, Signature};
use metrics::Counter;
use metrics_derive::Metrics;
use reth_primitives_traits::GotExpected;
use reth_scroll_primitives::ScrollBlock;
use rollup_node_primitives::{sig_encode_hash, ConsensusUpdate};
Expand Down Expand Up @@ -40,17 +42,28 @@ impl Consensus for NoopConsensus {
}
}

/// The metrics for the [`SystemContractConsensus`].
#[derive(Metrics, Clone)]
#[metrics(scope = "consensus")]
pub(crate) struct SystemContractConsensusMetrics {
/// System contract validate new block failed counter.
pub validate_new_block_failed: Counter,
}

/// The system contract consensus.
#[derive(Debug)]
pub struct SystemContractConsensus {
authorized_signer: Address,

/// The metrics for the [`SystemContractConsensus`].
metrics: SystemContractConsensusMetrics,
}

impl SystemContractConsensus {
/// Creates a new [`SystemContractConsensus`] consensus instance with the given authorized
/// signers.
pub const fn new(authorized_signer: Address) -> Self {
Self { authorized_signer }
pub fn new(authorized_signer: Address) -> Self {
Self { authorized_signer, metrics: SystemContractConsensusMetrics::default() }
}
}

Expand All @@ -70,6 +83,7 @@ impl Consensus for SystemContractConsensus {
let signer = reth_primitives_traits::crypto::secp256k1::recover_signer(signature, hash)?;

if self.authorized_signer != signer {
self.metrics.validate_new_block_failed.increment(1);
return Err(ConsensusError::IncorrectSigner(GotExpected {
got: signer,
expected: self.authorized_signer,
Expand Down
12 changes: 9 additions & 3 deletions crates/manager/src/manager/handle.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
use super::{RollupManagerCommand, RollupManagerEvent};
use crate::manager::metrics::HandleMetrics;
use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use rollup_node_primitives::BlockInfo;
use scroll_network::ScrollNetworkHandle;
use tokio::sync::{mpsc, oneshot};
use tracing::error;

/// The handle used to send commands to the rollup manager.
#[derive(Debug, Clone)]
pub struct RollupManagerHandle<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
/// The channel used to send commands to the rollup manager.
to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>,
handle_metrics: HandleMetrics,
}

impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> RollupManagerHandle<N> {
/// Create a new rollup manager handle.
pub const fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>) -> Self {
Self { to_manager_tx }
pub fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>) -> Self {
Self { to_manager_tx, handle_metrics: HandleMetrics::default() }
}

/// Sends a command to the rollup manager.
pub async fn send_command(&self, command: RollupManagerCommand<N>) {
let _ = self.to_manager_tx.send(command).await;
if let Err(err) = self.to_manager_tx.send(command).await {
self.handle_metrics.handle_send_command_failed.increment(1);
error!("Failed to send command to rollup manager: {}", err);
Comment thread
georgehao marked this conversation as resolved.
Outdated
}
}

/// Sends a command to the rollup manager to build a block.
Expand Down
48 changes: 48 additions & 0 deletions crates/manager/src/manager/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use metrics::{Counter, Gauge};
use metrics_derive::Metrics;

/// The metrics for the [`super::RollupManagerHandle`].
#[derive(Metrics, Clone)]
#[metrics(scope = "NodeManager")]
pub(crate) struct HandleMetrics {
/// Failed to send command to rollup manager from handle counter.
pub handle_send_command_failed: Counter,
}

/// The metrics for the [`super::RollupNodeManager`].
#[derive(Metrics, Clone)]
#[metrics(scope = "NodeManager")]
pub(crate) struct RollupNodeManagerMetrics {
/// Manager received and handle rollup manager command counter.
pub handle_rollup_manager_command: Counter,
/// Manager received and handle engine driver event counter.
pub handle_engine_driver_event: Counter,
/// Manager received and handle new block produced counter.
pub handle_new_block_produced: Counter,
/// Manager received and handle l1 notification counter.
pub handle_l1_notification: Counter,
/// Manager received and handle chain orchestrator event counter.
pub handle_chain_orchestrator_event: Counter,
/// Manager received and handle signer event counter.
pub handle_signer_event: Counter,
/// Manager received and handle build new payload counter.
pub handle_build_new_payload: Counter,
/// Manager received and handle l1 consolidation counter.
pub handle_l1_consolidation: Counter,
/// Manager received and handle network manager event counter.
pub handle_network_manager_event: Counter,
/// Manager finalized batch index gauge.
pub handle_finalized_batch_index: Gauge,
/// Manager l1 finalized block number gauge.
pub handle_l1_finalized_block_number: Gauge,
/// Manager L1 reorg L1 block number gauge.
pub handle_l1_reorg_l1_block_number: Gauge,
/// Manager L1 reorg L2 head block number gauge.
pub handle_l1_reorg_l2_head_block_number: Gauge,
/// Manager L1 reorg L2 safe block number gauge.
pub handle_l1_reorg_l2_safe_block_number: Gauge,
/// Manager chain import block number gauge.
pub handle_chain_import_block_number: Gauge,
/// Manager optimistic syncing block number gauge.
pub handle_optimistic_syncing_block_number: Gauge,
}
46 changes: 46 additions & 0 deletions crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use super::Consensus;
use crate::poll_nested_stream_with_budget;
use ::metrics::counter;
use alloy_provider::Provider;
use futures::StreamExt;
use reth_chainspec::EthChainSpec;
Expand Down Expand Up @@ -54,6 +55,9 @@ mod event;
pub use event::RollupManagerEvent;

mod handle;
mod metrics;

use crate::manager::metrics::RollupNodeManagerMetrics;
pub use handle::RollupManagerHandle;

/// The size of the event channel.
Expand Down Expand Up @@ -114,6 +118,8 @@ pub struct RollupNodeManager<
block_building_trigger: Option<Interval>,
/// The original block time configuration for restoring automatic sequencing.
block_time_config: Option<u64>,
// metrics for the rollup node manager.
metrics: RollupNodeManagerMetrics,
}

/// The current status of the rollup manager.
Expand Down Expand Up @@ -200,6 +206,7 @@ where
None
},
block_time_config: block_time,
metrics: RollupNodeManagerMetrics::default(),
};
(rnm, RollupManagerHandle::new(handle_tx))
}
Expand Down Expand Up @@ -284,10 +291,12 @@ where
// Update the derivation pipeline on new finalized batch.
#[allow(clippy::collapsible_match)]
if let Some(batch_info) = batch_info {
self.metrics.handle_finalized_batch_index.set(batch_info.inner.index as f64);
self.derivation_pipeline.push_batch(batch_info.inner, batch_info.number);
}
}
ChainOrchestratorEvent::L1BlockFinalized(l1_block_number, finalized_batches, ..) => {
self.metrics.handle_l1_finalized_block_number.set(l1_block_number as f64);
// update the sequencer's l1 finalized block number.
if let Some(sequencer) = self.sequencer.as_mut() {
sequencer.set_l1_finalized_block_number(l1_block_number);
Expand All @@ -309,6 +318,14 @@ where
l2_head_block_info,
l2_safe_block_info,
} => {
self.metrics.handle_l1_reorg_l1_block_number.set(l1_block_number as f64);
self.metrics
.handle_l1_reorg_l2_head_block_number
.set(l2_head_block_info.as_ref().map_or(0, |info| info.number) as f64);
self.metrics
.handle_l1_reorg_l2_safe_block_number
.set(l2_safe_block_info.as_ref().map_or(0, |info| info.number) as f64);

// Handle the reorg in the engine driver.
self.engine.handle_l1_reorg(
l1_block_number,
Expand All @@ -329,11 +346,17 @@ where
}
}
ChainOrchestratorEvent::ChainExtended(chain_import) => {
self.metrics
.handle_chain_import_block_number
.set(chain_import.chain.last().unwrap().number as f64);
trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header.clone(), peer_id = ?chain_import.peer_id.clone(), "Received chain extension from peer");
// Issue the new chain to the engine driver for processing.
self.engine.handle_chain_import(chain_import)
}
ChainOrchestratorEvent::ChainReorged(chain_import) => {
self.metrics
.handle_chain_import_block_number
.set(chain_import.chain.last().unwrap().number as f64);
trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header, ?chain_import.peer_id, "Received chain reorg from peer");

// Issue the new chain to the engine driver for processing.
Expand All @@ -343,6 +366,8 @@ where
let block_info: BlockInfo = (&block).into();
trace!(target: "scroll::node::manager", ?block_info, "Received optimistic sync from peer");

self.metrics.handle_optimistic_syncing_block_number.set(block_info.number as f64);

// Issue the new block info to the engine driver for processing.
self.engine.handle_optimistic_sync(block_info)
}
Expand All @@ -361,6 +386,12 @@ where

match err {
ChainOrchestratorError::L1MessageMismatch { expected, actual } => {
counter!(
"manager_handle_chain_orchestrator_event_failed",
"type" => "l1_message_mismatch",
)
.increment(1);

if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1MessageConsolidationError {
expected: *expected,
Expand All @@ -369,6 +400,12 @@ where
}
}
ChainOrchestratorError::DatabaseError(DatabaseError::L1MessageNotFound(start)) => {
counter!(
"manager_handle_chain_orchestrator_event_failed",
"type" => "l1_message_not_found",
)
.increment(1);

if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1MessageMissingInDatabase {
start: start.clone(),
Expand Down Expand Up @@ -429,6 +466,7 @@ where

/// Handles an [`L1Notification`] from the L1 watcher.
fn handle_l1_notification(&mut self, notification: L1Notification) {
self.metrics.handle_l1_notification.increment(1);
if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1NotificationEvent(notification.clone()));
}
Expand Down Expand Up @@ -500,6 +538,7 @@ where

// Poll the handle receiver for commands.
while let Poll::Ready(Some(command)) = this.handle_rx.poll_recv(cx) {
this.metrics.handle_rollup_manager_command.increment(1);
match command {
RollupManagerCommand::BuildBlock => {
proceed_if!(
Expand Down Expand Up @@ -551,6 +590,7 @@ where

// Drain all EngineDriver events.
while let Poll::Ready(Some(event)) = this.engine.poll_next_unpin(cx) {
this.metrics.handle_engine_driver_event.increment(1);
this.handle_engine_driver_event(event);
}

Expand All @@ -560,6 +600,7 @@ where
if let Some(Poll::Ready(Some(attributes))) =
this.sequencer.as_mut().map(|x| x.poll_next_unpin(cx))
{
this.metrics.handle_new_block_produced.increment(1);
this.engine.handle_build_new_payload(attributes);
}
);
Expand All @@ -581,6 +622,7 @@ where

// Drain all chain orchestrator events.
while let Poll::Ready(Some(result)) = this.chain.poll_next_unpin(cx) {
this.metrics.handle_chain_orchestrator_event.increment(1);
match result {
Ok(event) => this.handle_chain_orchestrator_event(event),
Err(err) => {
Expand All @@ -593,6 +635,7 @@ where
while let Some(Poll::Ready(Some(event))) =
this.signer.as_mut().map(|s| s.poll_next_unpin(cx))
{
this.metrics.handle_signer_event.increment(1);
match event {
SignerEvent::SignedBlock { block, signature } => {
trace!(target: "scroll::node::manager", ?block, ?signature, "Received signed block from signer, announcing to the network");
Expand Down Expand Up @@ -620,6 +663,7 @@ where
this.block_building_trigger.as_mut().map(|trigger| trigger.poll_tick(cx)),
this.sequencer.as_mut()
) {
this.metrics.handle_build_new_payload.increment(1);
if !this.consensus.should_sequence_block(
this.signer
.as_ref()
Expand All @@ -637,11 +681,13 @@ where

// Poll Derivation Pipeline and push attribute in queue if any.
while let Poll::Ready(Some(attributes)) = this.derivation_pipeline.poll_next_unpin(cx) {
this.metrics.handle_l1_consolidation.increment(1);
this.engine.handle_l1_consolidation(attributes)
}

// Handle network manager events.
while let Poll::Ready(Some(event)) = this.network.poll_next_unpin(cx) {
this.metrics.handle_network_manager_event.increment(1);
this.handle_network_manager_event(event);
}

Expand Down
Loading