Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ futures.workspace = true
tokio-stream.workspace = true
tokio.workspace = true
tracing.workspace = true
metrics.workspace = true
metrics-derive.workspace = true

[dev-dependencies]
alloy-consensus.workspace = true
Expand Down
18 changes: 16 additions & 2 deletions crates/manager/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use alloy_primitives::{Address, Signature};
use metrics::Counter;
use metrics_derive::Metrics;
use reth_primitives_traits::GotExpected;
use reth_scroll_primitives::ScrollBlock;
use rollup_node_primitives::{sig_encode_hash, ConsensusUpdate};
Expand Down Expand Up @@ -40,17 +42,28 @@ impl Consensus for NoopConsensus {
}
}

/// The metrics for the [`SystemContractConsensus`].
#[derive(Metrics, Clone)]
#[metrics(scope = "consensus")]
pub(crate) struct SystemContractConsensusMetrics {
/// System contract validate new block failed counter.
pub validate_new_block_failed: Counter,
}

/// The system contract consensus.
#[derive(Debug)]
pub struct SystemContractConsensus {
authorized_signer: Address,

/// The metrics for the [`SystemContractConsensus`].
metrics: SystemContractConsensusMetrics,
}

impl SystemContractConsensus {
/// Creates a new [`SystemContractConsensus`] consensus instance with the given authorized
/// signers.
pub const fn new(authorized_signer: Address) -> Self {
Self { authorized_signer }
pub fn new(authorized_signer: Address) -> Self {
Self { authorized_signer, metrics: SystemContractConsensusMetrics::default() }
}
}

Expand All @@ -70,6 +83,7 @@ impl Consensus for SystemContractConsensus {
let signer = reth_primitives_traits::crypto::secp256k1::recover_signer(signature, hash)?;

if self.authorized_signer != signer {
self.metrics.validate_new_block_failed.increment(1);
return Err(ConsensusError::IncorrectSigner(GotExpected {
got: signer,
expected: self.authorized_signer,
Expand Down
12 changes: 9 additions & 3 deletions crates/manager/src/manager/handle.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
use super::{RollupManagerCommand, RollupManagerEvent};
use crate::manager::metrics::HandleMetrics;
use reth_network_api::FullNetwork;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tokio_util::EventStream;
use rollup_node_primitives::BlockInfo;
use scroll_network::ScrollNetworkHandle;
use tokio::sync::{mpsc, oneshot};
use tracing::error;

/// The handle used to send commands to the rollup manager.
#[derive(Debug, Clone)]
pub struct RollupManagerHandle<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> {
/// The channel used to send commands to the rollup manager.
to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>,
handle_metrics: HandleMetrics,
}

impl<N: FullNetwork<Primitives = ScrollNetworkPrimitives>> RollupManagerHandle<N> {
/// Create a new rollup manager handle.
pub const fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>) -> Self {
Self { to_manager_tx }
pub fn new(to_manager_tx: mpsc::Sender<RollupManagerCommand<N>>) -> Self {
Self { to_manager_tx, handle_metrics: HandleMetrics::default() }
}

/// Sends a command to the rollup manager.
pub async fn send_command(&self, command: RollupManagerCommand<N>) {
let _ = self.to_manager_tx.send(command).await;
if let Err(err) = self.to_manager_tx.send(command).await {
self.handle_metrics.handle_send_command_failed.increment(1);
error!("Failed to send command to rollup manager: {}", err);
Comment thread
georgehao marked this conversation as resolved.
Outdated
}
}

/// Sends a command to the rollup manager to build a block.
Expand Down
48 changes: 48 additions & 0 deletions crates/manager/src/manager/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use metrics::{Counter, Gauge};
use metrics_derive::Metrics;

/// The metrics for the [`super::RollupManagerHandle`].
#[derive(Metrics, Clone)]
#[metrics(scope = "NodeManager")]
pub(crate) struct HandleMetrics {
/// Failed to send command to rollup manager from handle counter.
pub handle_send_command_failed: Counter,
}

/// The metrics for the [`super::RollupNodeManager`].
#[derive(Metrics, Clone)]
#[metrics(scope = "NodeManager")]
pub(crate) struct RollupNodeManagerMetrics {
/// Manager received and handle rollup manager command counter.
pub handle_rollup_manager_command: Counter,
/// Manager received and handle engine driver event counter.
pub handle_engine_driver_event: Counter,
/// Manager received and handle new block produced counter.
pub handle_new_block_produced: Counter,
/// Manager received and handle l1 notification counter.
pub handle_l1_notification: Counter,
/// Manager received and handle chain orchestrator event counter.
pub handle_chain_orchestrator_event: Counter,
/// Manager received and handle signer event counter.
pub handle_signer_event: Counter,
/// Manager received and handle build new payload counter.
pub handle_build_new_payload: Counter,
/// Manager received and handle l1 consolidation counter.
pub handle_l1_consolidation: Counter,
/// Manager received and handle network manager event counter.
pub handle_network_manager_event: Counter,
/// Manager finalized batch index gauge.
pub handle_finalized_batch_index: Gauge,
/// Manager l1 finalized block number gauge.
pub handle_l1_finalized_block_number: Gauge,
/// Manager L1 reorg L1 block number gauge.
pub handle_l1_reorg_l1_block_number: Gauge,
/// Manager L1 reorg L2 head block number gauge.
pub handle_l1_reorg_l2_head_block_number: Gauge,
/// Manager L1 reorg L2 safe block number gauge.
pub handle_l1_reorg_l2_safe_block_number: Gauge,
/// Manager chain import block number gauge.
pub handle_chain_import_block_number: Gauge,
/// Manager optimistic syncing block number gauge.
pub handle_optimistic_syncing_block_number: Gauge,
}
46 changes: 46 additions & 0 deletions crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use super::Consensus;
use crate::poll_nested_stream_with_budget;
use ::metrics::counter;
use alloy_provider::Provider;
use futures::StreamExt;
use reth_chainspec::EthChainSpec;
Expand Down Expand Up @@ -54,6 +55,9 @@ mod event;
pub use event::RollupManagerEvent;

mod handle;
mod metrics;

use crate::manager::metrics::RollupNodeManagerMetrics;
pub use handle::RollupManagerHandle;

/// The size of the event channel.
Expand Down Expand Up @@ -114,6 +118,8 @@ pub struct RollupNodeManager<
block_building_trigger: Option<Interval>,
/// The original block time configuration for restoring automatic sequencing.
block_time_config: Option<u64>,
// metrics for the rollup node manager.
metrics: RollupNodeManagerMetrics,
}

/// The current status of the rollup manager.
Expand Down Expand Up @@ -200,6 +206,7 @@ where
None
},
block_time_config: block_time,
metrics: RollupNodeManagerMetrics::default(),
};
(rnm, RollupManagerHandle::new(handle_tx))
}
Expand Down Expand Up @@ -283,10 +290,12 @@ where
// Remove once we implement issue #273.
// Update the derivation pipeline on new finalized batch.
for batch_info in finalized_batches {
self.metrics.handle_finalized_batch_index.set(batch_info.index as f64);
self.derivation_pipeline.push_batch(batch_info, block_number);
}
}
ChainOrchestratorEvent::L1BlockFinalized(l1_block_number, finalized_batches, ..) => {
self.metrics.handle_l1_finalized_block_number.set(l1_block_number as f64);
// update the sequencer's l1 finalized block number.
if let Some(sequencer) = self.sequencer.as_mut() {
sequencer.set_l1_finalized_block_number(l1_block_number);
Expand All @@ -308,6 +317,14 @@ where
l2_head_block_info,
l2_safe_block_info,
} => {
self.metrics.handle_l1_reorg_l1_block_number.set(l1_block_number as f64);
self.metrics
.handle_l1_reorg_l2_head_block_number
.set(l2_head_block_info.as_ref().map_or(0, |info| info.number) as f64);
self.metrics
.handle_l1_reorg_l2_safe_block_number
.set(l2_safe_block_info.as_ref().map_or(0, |info| info.number) as f64);

// Handle the reorg in the engine driver.
self.engine.handle_l1_reorg(
l1_block_number,
Expand All @@ -328,11 +345,17 @@ where
}
}
ChainOrchestratorEvent::ChainExtended(chain_import) => {
self.metrics
.handle_chain_import_block_number
.set(chain_import.chain.last().unwrap().number as f64);
trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header.clone(), peer_id = ?chain_import.peer_id.clone(), "Received chain extension from peer");
// Issue the new chain to the engine driver for processing.
self.engine.handle_chain_import(chain_import)
}
ChainOrchestratorEvent::ChainReorged(chain_import) => {
self.metrics
.handle_chain_import_block_number
.set(chain_import.chain.last().unwrap().number as f64);
trace!(target: "scroll::node::manager", head = ?chain_import.chain.last().unwrap().header, ?chain_import.peer_id, "Received chain reorg from peer");

// Issue the new chain to the engine driver for processing.
Expand All @@ -342,6 +365,8 @@ where
let block_info: BlockInfo = (&block).into();
trace!(target: "scroll::node::manager", ?block_info, "Received optimistic sync from peer");

self.metrics.handle_optimistic_syncing_block_number.set(block_info.number as f64);

// Issue the new block info to the engine driver for processing.
self.engine.handle_optimistic_sync(block_info)
}
Expand All @@ -360,6 +385,12 @@ where

match err {
ChainOrchestratorError::L1MessageMismatch { expected, actual } => {
counter!(
"manager_handle_chain_orchestrator_event_failed",
"type" => "l1_message_mismatch",
)
.increment(1);

if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1MessageConsolidationError {
expected: *expected,
Expand All @@ -368,6 +399,12 @@ where
}
}
ChainOrchestratorError::DatabaseError(DatabaseError::L1MessageNotFound(start)) => {
counter!(
"manager_handle_chain_orchestrator_event_failed",
"type" => "l1_message_not_found",
)
.increment(1);

if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1MessageMissingInDatabase {
start: start.clone(),
Expand Down Expand Up @@ -428,6 +465,7 @@ where

/// Handles an [`L1Notification`] from the L1 watcher.
fn handle_l1_notification(&mut self, notification: L1Notification) {
self.metrics.handle_l1_notification.increment(1);
if let Some(event_sender) = self.event_sender.as_ref() {
event_sender.notify(RollupManagerEvent::L1NotificationEvent(notification.clone()));
}
Expand Down Expand Up @@ -499,6 +537,7 @@ where

// Poll the handle receiver for commands.
while let Poll::Ready(Some(command)) = this.handle_rx.poll_recv(cx) {
this.metrics.handle_rollup_manager_command.increment(1);
match command {
RollupManagerCommand::BuildBlock => {
proceed_if!(
Expand Down Expand Up @@ -550,6 +589,7 @@ where

// Drain all EngineDriver events.
while let Poll::Ready(Some(event)) = this.engine.poll_next_unpin(cx) {
this.metrics.handle_engine_driver_event.increment(1);
this.handle_engine_driver_event(event);
}

Expand All @@ -559,6 +599,7 @@ where
if let Some(Poll::Ready(Some(attributes))) =
this.sequencer.as_mut().map(|x| x.poll_next_unpin(cx))
{
this.metrics.handle_new_block_produced.increment(1);
this.engine.handle_build_new_payload(attributes);
}
);
Expand All @@ -580,6 +621,7 @@ where

// Drain all chain orchestrator events.
while let Poll::Ready(Some(result)) = this.chain.poll_next_unpin(cx) {
this.metrics.handle_chain_orchestrator_event.increment(1);
match result {
Ok(event) => this.handle_chain_orchestrator_event(event),
Err(err) => {
Expand All @@ -592,6 +634,7 @@ where
while let Some(Poll::Ready(Some(event))) =
this.signer.as_mut().map(|s| s.poll_next_unpin(cx))
{
this.metrics.handle_signer_event.increment(1);
match event {
SignerEvent::SignedBlock { block, signature } => {
trace!(target: "scroll::node::manager", ?block, ?signature, "Received signed block from signer, announcing to the network");
Expand Down Expand Up @@ -619,6 +662,7 @@ where
this.block_building_trigger.as_mut().map(|trigger| trigger.poll_tick(cx)),
this.sequencer.as_mut()
) {
this.metrics.handle_build_new_payload.increment(1);
if !this.consensus.should_sequence_block(
this.signer
.as_ref()
Expand All @@ -636,11 +680,13 @@ where

// Poll Derivation Pipeline and push attribute in queue if any.
while let Poll::Ready(Some(attributes)) = this.derivation_pipeline.poll_next_unpin(cx) {
this.metrics.handle_l1_consolidation.increment(1);
this.engine.handle_l1_consolidation(attributes)
}

// Handle network manager events.
while let Poll::Ready(Some(event)) = this.network.poll_next_unpin(cx) {
this.metrics.handle_network_manager_event.increment(1);
this.handle_network_manager_event(event);
}

Expand Down
Loading