diff --git a/Cargo.lock b/Cargo.lock index 780cb74126..acfc115c20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12632,6 +12632,7 @@ dependencies = [ "proptest", "proptest-derive 0.5.1", "reltester", + "reqwest", "rockbound", "schemars 0.8.22", "serde", diff --git a/crates/full-node/full-node-configs/src/runner.rs b/crates/full-node/full-node-configs/src/runner.rs index c53b8802e6..889c481055 100644 --- a/crates/full-node/full-node-configs/src/runner.rs +++ b/crates/full-node/full-node-configs/src/runner.rs @@ -1,3 +1,4 @@ +use std::net::IpAddr; use std::num::NonZero; use std::path::Path; @@ -53,6 +54,12 @@ pub struct HttpServerConfig { /// public_address = "https://rollup.example.com" /// ``` pub public_address: Option, + /// Reverse proxies whose `X-Forwarded-For` headers should be trusted. + /// + /// Requests coming from other peers will ignore `X-Forwarded-For` and use the direct socket + /// address instead. + #[serde(default = "Vec::::new")] + pub trusted_proxies: Vec, /// Enable or disable CORS policy headers. Enabled by default. #[serde(default)] pub cors: CorsConfiguration, @@ -92,6 +99,7 @@ impl HttpServerConfig { bind_host: "127.0.0.1".to_string(), bind_port: port, public_address: None, + trusted_proxies: Vec::new(), cors: CorsConfiguration::Permissive, } } @@ -103,6 +111,7 @@ impl HttpServerConfig { bind_host: host.into(), bind_port: port, public_address: None, + trusted_proxies: Vec::new(), cors: CorsConfiguration::Permissive, } } diff --git a/crates/full-node/full-node-configs/src/sequencer.rs b/crates/full-node/full-node-configs/src/sequencer.rs index 65fac825fb..7614578ecd 100644 --- a/crates/full-node/full-node-configs/src/sequencer.rs +++ b/crates/full-node/full-node-configs/src/sequencer.rs @@ -13,6 +13,9 @@ pub enum SequencerKindConfig { Standard(StdSequencerConfig), /// A "Preferred" sequencer which is allowed to give soft confirmations. Preferred(PreferredSequencerConfig
), + /// A "Forwarding" sequencer that serves reads locally and forwards tx submissions to a + /// remote preferred sequencer. + Forwarding(ForwardingSequencerConfig), } impl Default @@ -107,6 +110,14 @@ impl SequencerConfig { SequencerKindConfig::Preferred(_) ) } + + /// Returns true if the sequencer uses [`SequencerKindConfig::Forwarding`]. + pub fn is_forwarding_sequencer(&self) -> bool { + matches!( + self.sequencer_kind_config, + SequencerKindConfig::Forwarding(_) + ) + } } fn default_sequencer_dropped_tx_ttl_secs() -> u64 { @@ -334,6 +345,34 @@ pub struct StdSequencerConfig { pub max_batch_size_bytes: Option>, } +/// Configuration for a `ForwardingSequencer`, a read-only sequencer that forwards tx submissions +/// to a remote preferred sequencer while serving reads from the local node. +#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct ForwardingSequencerConfig { + /// Base URL of the upstream preferred sequencer (e.g. `http://leader:12346`). + /// Transaction submissions are forwarded to this endpoint. + pub upstream_url: String, + /// DA address of the upstream preferred sequencer. + /// This is used by local read/simulation APIs so they match the upstream's preferred + /// sequencer behavior. + pub upstream_da_address: String, + /// HTTP request timeout in milliseconds for calls to the upstream sequencer. + #[serde(default = "default_forwarding_request_timeout_ms")] + pub request_timeout_ms: u64, + /// TTL in milliseconds for the cached readiness result from the upstream sequencer. + #[serde(default = "default_forwarding_readiness_cache_ms")] + pub readiness_cache_ms: u64, +} + +const fn default_forwarding_request_timeout_ms() -> u64 { + 10_000 +} + +const fn default_forwarding_readiness_cache_ms() -> u64 { + 200 +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Eq, PartialEq, JsonSchema)] pub struct SovRateLimiterConfig { /// The cache size for the rate limiter. diff --git a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap.new b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap.new new file mode 100644 index 0000000000..b781194ed9 --- /dev/null +++ b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config.snap.new @@ -0,0 +1,80 @@ +--- +source: crates/full-node/full-node-configs/src/runner.rs +assertion_line: 242 +expression: config +--- +{ + "storage": { + "path": "/tmp", + "ledger_db_path": null, + "state_cache_size": null, + "user_commit_concurrency": null, + "user_hashtable_buckets": null, + "user_preallocate_ht": null, + "user_page_cache_size": null, + "user_leaf_cache_size": null, + "user_page_cache_upper_levels": null, + "kernel_commit_concurrency": null, + "kernel_hashtable_buckets": null, + "kernel_preallocate_ht": null, + "kernel_page_cache_size": null, + "kernel_leaf_cache_size": null, + "kernel_page_cache_upper_levels": null, + "pruner_block_interval": null, + "pruner_versions_to_keep": null, + "pruner_max_batch_size": null + }, + "runner": { + "da_polling_interval_ms": 10000, + "http_config": { + "bind_host": "127.0.0.1", + "bind_port": 12346, + "public_address": "https://rollup.sovereign.xyz", + "trusted_proxies": [], + "cors": "restrictive" + }, + "concurrent_sync_tasks": 18, + "pre_fetched_blocks_capacity": 20, + "save_tx_bodies": false + }, + "da": { + "connection_string": "sqlite:///tmp/mockda.sqlite?mode=rwc", + "sender_address": "0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f", + "finalization_blocks": 0, + "block_producing": { + "periodic": { + "block_time_ms": 1000 + } + }, + "randomization": null, + "failure_behavior": "none" + }, + "proof_manager": { + "aggregated_proof_block_jump": 22, + "prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + "max_number_of_transitions_in_db": 1025, + "max_number_of_transitions_in_memory": 768, + "eager_proof_submission": true + }, + "sequencer": { + "automatic_batch_production": true, + "max_allowed_node_distance_behind": 5, + "dropped_tx_ttl_secs": 60, + "rollup_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + "admin_addresses": [], + "standard": { + "mempool_max_txs_count": null, + "max_batch_size_bytes": null + }, + "max_batch_size_bytes": 1048576, + "max_concurrent_blobs": 16, + "blob_processing_timeout_secs": 60, + "extension": null + }, + "monitoring": { + "telegraf_address": "udp://192.168.4.5:8543", + "max_datagram_size": 1024, + "max_pending_metrics": 2560, + "tokio_runtime_metrics_interval_millis": 500 + } +} diff --git a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap.new b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap.new new file mode 100644 index 0000000000..36583251a5 --- /dev/null +++ b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_postgres.snap.new @@ -0,0 +1,99 @@ +--- +source: crates/full-node/full-node-configs/src/runner.rs +assertion_line: 296 +expression: config +--- +{ + "storage": { + "path": "/tmp", + "ledger_db_path": "/tmp/ledger-db", + "state_cache_size": null, + "user_commit_concurrency": null, + "user_hashtable_buckets": null, + "user_preallocate_ht": null, + "user_page_cache_size": null, + "user_leaf_cache_size": null, + "user_page_cache_upper_levels": null, + "kernel_commit_concurrency": null, + "kernel_hashtable_buckets": null, + "kernel_preallocate_ht": null, + "kernel_page_cache_size": null, + "kernel_leaf_cache_size": null, + "kernel_page_cache_upper_levels": null, + "pruner_block_interval": null, + "pruner_versions_to_keep": null, + "pruner_max_batch_size": null + }, + "runner": { + "da_polling_interval_ms": 10000, + "http_config": { + "bind_host": "127.0.0.1", + "bind_port": 12346, + "public_address": "https://rollup.sovereign.xyz", + "trusted_proxies": [], + "cors": "restrictive" + }, + "concurrent_sync_tasks": 18, + "pre_fetched_blocks_capacity": 20, + "save_tx_bodies": false + }, + "da": { + "connection_string": "sqlite:///tmp/mockda.sqlite?mode=rwc", + "sender_address": "0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f", + "finalization_blocks": 0, + "block_producing": { + "periodic": { + "block_time_ms": 1000 + } + }, + "randomization": null, + "failure_behavior": "none" + }, + "proof_manager": { + "aggregated_proof_block_jump": 22, + "prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + "max_number_of_transitions_in_db": 1025, + "max_number_of_transitions_in_memory": 768, + "eager_proof_submission": true + }, + "sequencer": { + "automatic_batch_production": true, + "max_allowed_node_distance_behind": 5, + "dropped_tx_ttl_secs": 60, + "rollup_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + "admin_addresses": [], + "preferred": { + "minimum_profit_per_tx": 0, + "events_channel_size": 10000, + "postgres_config": { + "postgres_connection_string": "postgresql://postgres:pass@localhost:5432/db", + "node_id": "node_1", + "node_role": "Leader", + "leader_election": { + "leader_timeout_millis": 500, + "grace_period_millis": 10000, + "heartbeat_interval_millis": 100 + } + }, + "disable_state_root_consistency_checks": true, + "ideal_lag_behind_finalized_slot": 3, + "db_event_channel_size": 10000, + "recovery_strategy": "TryToSave", + "batch_execution_time_limit_millis": 2000, + "num_cache_warmup_workers": 0, + "rate_limiter": null, + "maximum_future_nonce_delta": 100, + "future_nonce_transaction_timeout_millis": 2000 + }, + "max_batch_size_bytes": 1048576, + "max_concurrent_blobs": 16, + "blob_processing_timeout_secs": 60, + "extension": null + }, + "monitoring": { + "telegraf_address": "udp://192.168.4.5:8543", + "max_datagram_size": 1024, + "max_pending_metrics": 2560, + "tokio_runtime_metrics_interval_millis": 500 + } +} diff --git a/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap.new b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap.new new file mode 100644 index 0000000000..cf1c99c373 --- /dev/null +++ b/crates/full-node/full-node-configs/src/snapshots/sov_full_node_configs__runner__tests__correct_config_with_rate_limiter.snap.new @@ -0,0 +1,131 @@ +--- +source: crates/full-node/full-node-configs/src/runner.rs +assertion_line: 360 +expression: config +--- +{ + "storage": { + "path": "/tmp", + "ledger_db_path": null, + "state_cache_size": null, + "user_commit_concurrency": null, + "user_hashtable_buckets": null, + "user_preallocate_ht": null, + "user_page_cache_size": null, + "user_leaf_cache_size": null, + "user_page_cache_upper_levels": null, + "kernel_commit_concurrency": null, + "kernel_hashtable_buckets": null, + "kernel_preallocate_ht": null, + "kernel_page_cache_size": null, + "kernel_leaf_cache_size": null, + "kernel_page_cache_upper_levels": null, + "pruner_block_interval": null, + "pruner_versions_to_keep": null, + "pruner_max_batch_size": null + }, + "runner": { + "da_polling_interval_ms": 10000, + "http_config": { + "bind_host": "127.0.0.1", + "bind_port": 12346, + "public_address": "https://rollup.sovereign.xyz", + "trusted_proxies": [], + "cors": "restrictive" + }, + "concurrent_sync_tasks": 18, + "pre_fetched_blocks_capacity": 20, + "save_tx_bodies": false + }, + "da": { + "connection_string": "sqlite:///tmp/mockda.sqlite?mode=rwc", + "sender_address": "0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f", + "finalization_blocks": 0, + "block_producing": { + "periodic": { + "block_time_ms": 1000 + } + }, + "randomization": null, + "failure_behavior": "none" + }, + "proof_manager": { + "aggregated_proof_block_jump": 22, + "prover_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + "max_number_of_transitions_in_db": 1025, + "max_number_of_transitions_in_memory": 768, + "eager_proof_submission": true + }, + "sequencer": { + "automatic_batch_production": true, + "max_allowed_node_distance_behind": 5, + "dropped_tx_ttl_secs": 60, + "rollup_address": "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + "admin_addresses": [], + "preferred": { + "minimum_profit_per_tx": 0, + "events_channel_size": 10000, + "postgres_config": { + "postgres_connection_string": "postgresql://postgres:pass@localhost:5432/db", + "node_id": "node_1", + "node_role": "Leader", + "leader_election": { + "leader_timeout_millis": 500, + "grace_period_millis": 10000, + "heartbeat_interval_millis": 100 + } + }, + "disable_state_root_consistency_checks": true, + "ideal_lag_behind_finalized_slot": 3, + "db_event_channel_size": 10000, + "recovery_strategy": "TryToSave", + "batch_execution_time_limit_millis": 2000, + "num_cache_warmup_workers": 0, + "rate_limiter": { + "max_nb_of_concurrent_users_in_rate_limiter": 100000, + "max_requests_per_second": 1000000, + "default_limits": { + "resources_per_bucket": 5, + "refill_rate": 2 + }, + "address_custom_limits": [ + [ + "sov1lzkjgdaz08su3yevqu6ceywufl35se9f33kztu5cu2spja5hyyf", + { + "resources_per_bucket": 10, + "refill_rate": 2 + } + ] + ], + "ip_custom_limits": [ + [ + "157.180.14.244", + { + "resources_per_bucket": 10, + "refill_rate": 2 + } + ], + [ + "157.180.34.249", + { + "resources_per_bucket": 8, + "refill_rate": 2 + } + ] + ] + }, + "maximum_future_nonce_delta": 100, + "future_nonce_transaction_timeout_millis": 2000 + }, + "max_batch_size_bytes": 1048576, + "max_concurrent_blobs": 16, + "blob_processing_timeout_secs": 60, + "extension": null + }, + "monitoring": { + "telegraf_address": "udp://192.168.4.5:8543", + "max_datagram_size": 1024, + "max_pending_metrics": 2560, + "tokio_runtime_metrics_interval_millis": 500 + } +} diff --git a/crates/full-node/sov-ethereum/tests/integration/common/setup.rs b/crates/full-node/sov-ethereum/tests/integration/common/setup.rs index 2f6cc00ea9..c1c9f9c0b4 100644 --- a/crates/full-node/sov-ethereum/tests/integration/common/setup.rs +++ b/crates/full-node/sov-ethereum/tests/integration/common/setup.rs @@ -58,6 +58,7 @@ async fn start_node_with_genesis( c.aggregated_proof_block_jump = 5; c.max_infos_in_db = 30; c.max_channel_size = 20; + c.trusted_proxies = vec![std::net::Ipv4Addr::LOCALHOST.into()]; c.extension = extension; if let SequencerKindConfig::Preferred(ref mut seq) = c.sequencer_config { seq.ideal_lag_behind_finalized_slot = ideal_lag; diff --git a/crates/full-node/sov-sequencer/Cargo.toml b/crates/full-node/sov-sequencer/Cargo.toml index 90933d922d..3df430857c 100644 --- a/crates/full-node/sov-sequencer/Cargo.toml +++ b/crates/full-node/sov-sequencer/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } hex = { workspace = true } jsonrpsee = { workspace = true, default-features = false, features = ["client", "server"] } mini-moka = "0.10" +reqwest = { workspace = true, features = ["json"] } sov-full-node-configs = { workspace = true } rockbound = { workspace = true } schemars = { workspace = true } diff --git a/crates/full-node/sov-sequencer/src/common.rs b/crates/full-node/sov-sequencer/src/common.rs index b83e82fa9f..e1eb46dc99 100644 --- a/crates/full-node/sov-sequencer/src/common.rs +++ b/crates/full-node/sov-sequencer/src/common.rs @@ -204,6 +204,17 @@ pub trait Sequencer: Clone + Send + Sync + 'static { Ok(None) } + /// Queries a transaction by hash and returns the API-facing representation. + async fn get_api_tx( + &self, + tx_hash: TxHash, + ) -> anyhow::Result>> { + Ok(self + .get_tx(tx_hash) + .await? + .map(ApiAcceptedTx::<_>::from_accepted_tx::)) + } + /// Updates the sequencer's view of the state of the rollup. async fn update_state( &self, diff --git a/crates/full-node/sov-sequencer/src/forwarding/mod.rs b/crates/full-node/sov-sequencer/src/forwarding/mod.rs new file mode 100644 index 0000000000..7bd1594492 --- /dev/null +++ b/crates/full-node/sov-sequencer/src/forwarding/mod.rs @@ -0,0 +1,545 @@ +//! A `Sequencer` that serves reads from the local node and forwards transaction submissions to a +//! remote preferred sequencer. +//! +//! A forwarding sequencer lets an operator run their own full node for read access to the chain +//! without running a `PreferredSequencer` (which can only run in one place) or a `StdSequencer` +//! (whose locally-simulated results can diverge from the canonical order). It implements the +//! [`Sequencer`] trait by proxying writes to a remote upstream over HTTP while answering reads from +//! the local node's state. +//! +//! Limitations (v1): +//! - WebSocket subscription methods return `None`; clients wanting event/tx streams should connect +//! directly to the upstream. +//! - Proof blob publication is unsupported — forwarding nodes are not provers. +//! - The EVM preflight in `sov_ethereum::handlers` runs against local (possibly lagging) state, +//! which can spuriously reject valid transactions. + +use std::marker::PhantomData; +use std::net::IpAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use anyhow::Context; +use async_trait::async_trait; +use axum::http::StatusCode; +use reqwest::StatusCode as ReqwestStatusCode; +pub use sov_full_node_configs::sequencer::ForwardingSequencerConfig; +use sov_modules_api::rest::utils::ErrorObject; +use sov_modules_api::rest::ApiState; +use sov_modules_api::{ConcurrentStateCheckpoint, DaSpec, FullyBakedTx, Spec, StateCheckpoint}; +use sov_modules_stf_blueprint::Runtime; +use sov_rest_utils::json_obj; +use sov_rollup_full_node_interface::{DaSyncState, StateUpdateInfo, StateUpdateReceiver}; +use sov_rollup_interface::da::DaBlobHash; +use sov_rollup_interface::node::da::DaService; +use tokio::sync::{watch, Mutex}; +use tokio::task::JoinHandle; +use tracing::{debug, trace, warn}; + +use crate::common::{loop_call_update_state, loop_send_tx_notifications, AcceptedTx, Sequencer}; +use crate::preferred::Confirmation; +use crate::rest_api::{ApiAcceptedTx, TxInfoWithConfirmation}; +use crate::{ + ProofBlobSender, SequencerConfig, SequencerNotReadyDetails, SerializedProofWithDetailsBytes, + TxHash, TxStatus, TxStatusManager, +}; + +/// A [`Sequencer`] that serves reads locally and forwards writes to a remote upstream. +/// +/// The remote upstream is expected to be a `PreferredSequencer`; the forwarding sequencer +/// deserializes upstream responses into the native [`Confirmation`] type so clients see +/// byte-identical results whether they talk to the upstream directly or through the forwarder. +#[derive(derivative::Derivative)] +#[derivative(Clone(bound = ""))] +pub struct ForwardingSequencer(Arc>) +where + S: Spec, + Rt: Runtime, + Da: DaService; + +impl std::ops::Deref for ForwardingSequencer +where + S: Spec, + Rt: Runtime, + Da: DaService, +{ + type Target = ForwardingSequencerFields; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Inner fields of a [`ForwardingSequencer`]. Access through the parent struct's `Arc`. +pub struct ForwardingSequencerFields +where + S: Spec, + Rt: Runtime, + Da: DaService, +{ + http: reqwest::Client, + upstream_url: reqwest::Url, + readiness_cache_ttl: Duration, + readiness_cache: Mutex)>>, + txsm: TxStatusManager, + api_state: ApiState, + checkpoint_sender: watch::Sender>>, + api_ledger_db: sov_db::ledger_db::LedgerDb, + _marker: PhantomData<(Rt, Da)>, +} + +impl ForwardingSequencer +where + S: Spec, + Rt: Runtime, + Da: DaService, +{ + /// Creates a `ForwardingSequencer` and spawns background tasks that keep the local + /// `ApiState` fresh as the node ingests new slots. + pub async fn create( + state_update_receiver: StateUpdateReceiver, + _da_sync_state: Arc, + config: &SequencerConfig, + ledger_db: sov_db::ledger_db::LedgerDb, + api_ledger_db: sov_db::ledger_db::LedgerDb, + shutdown_sender: watch::Sender<()>, + ) -> anyhow::Result<(Self, Vec>)> { + let shutdown_receiver = shutdown_sender.subscribe(); + let cfg = &config.sequencer_kind_config; + + let upstream_url = reqwest::Url::parse(&cfg.upstream_url).with_context(|| { + format!( + "Failed to parse forwarding sequencer upstream_url {:?}", + cfg.upstream_url + ) + })?; + + let http = reqwest::Client::builder() + .timeout(Duration::from_millis(cfg.request_timeout_ms)) + .build() + .context("Failed to build HTTP client for forwarding sequencer")?; + + let mut runtime = Rt::default(); + let kernel_with_slot_mapping = runtime.kernel_with_slot_mapping(); + + let latest_state_update = state_update_receiver.borrow().clone(); + let checkpoint = Arc::new( + ConcurrentStateCheckpoint::from_state_checkpoint_with_finalized_slot( + StateCheckpoint::new(latest_state_update.storage.clone(), &runtime.kernel(), None), + latest_state_update.latest_finalized_slot_number, + ), + ); + let (checkpoint_sender, checkpoint_receiver) = watch::channel(checkpoint); + + let api_state = ApiState::build( + Arc::new(()), + checkpoint_receiver, + kernel_with_slot_mapping, + None, + ); + + let txsm = TxStatusManager::default(); + + let seq = ForwardingSequencer(Arc::new(ForwardingSequencerFields { + http, + upstream_url, + readiness_cache_ttl: Duration::from_millis(cfg.readiness_cache_ms), + readiness_cache: Mutex::new(None), + txsm, + api_state, + checkpoint_sender, + api_ledger_db, + _marker: PhantomData, + })); + + // Keep unused sender alive so we don't panic trying to subscribe on a closed watch. + let _ = &shutdown_sender; + + let mut handles: Vec> = Vec::new(); + + handles.push(tokio::spawn(loop_call_update_state( + seq.clone(), + state_update_receiver.clone(), + shutdown_receiver.clone(), + ))); + handles.push(tokio::spawn({ + let ledger_db = ledger_db.clone(); + let seq = seq.clone(); + async move { + loop_send_tx_notifications::( + state_update_receiver, + shutdown_receiver, + &ledger_db, + seq.tx_status_manager(), + ) + .await; + } + })); + + Ok((seq, handles)) + } + + fn upstream_url_for(&self, path: &str) -> anyhow::Result { + self.upstream_url + .join(path) + .with_context(|| format!("Failed to build upstream URL for {path}")) + } + + async fn ready_upstream(&self) -> Result<(), SequencerNotReadyDetails> { + let url = self + .upstream_url_for("sequencer/ready") + .map_err(|_| SequencerNotReadyDetails::Startup)?; + let response = self + .http + .get(url) + .send() + .await + .map_err(|_| SequencerNotReadyDetails::Startup)?; + + if response.status().is_success() { + return Ok(()); + } + if response.status() == ReqwestStatusCode::SERVICE_UNAVAILABLE { + // Upstream is initializing or syncing. Surface a generic "Startup" rather than + // attempting to parse the upstream's error envelope; callers only care that the + // sequencer is not ready. + return Err(SequencerNotReadyDetails::Startup); + } + Err(SequencerNotReadyDetails::Startup) + } +} + +#[async_trait] +impl Sequencer for ForwardingSequencer +where + S: Spec, + Rt: Runtime, + Da: DaService, +{ + // The upstream is expected to be a `PreferredSequencer`, which returns rich confirmations. + // Reusing the native type here means clients see the same JSON shape whether they go through + // the forwarder or talk to the upstream directly. + type Confirmation = Confirmation; + type Spec = S; + type Rt = Rt; + type Da = Da; + + async fn is_ready(&self) -> Result<(), SequencerNotReadyDetails> { + if self.readiness_cache_ttl > Duration::ZERO { + let mut guard = self.readiness_cache.lock().await; + if let Some((ts, result)) = guard.as_ref() { + if ts.elapsed() < self.readiness_cache_ttl { + return result.clone(); + } + } + let result = self.ready_upstream().await; + *guard = Some((Instant::now(), result.clone())); + result + } else { + self.ready_upstream().await + } + } + + fn tx_status_manager(&self) -> &TxStatusManager<::Da> { + &self.txsm + } + + fn api_state(&self) -> ApiState { + self.api_state.clone() + } + + async fn update_state( + &self, + state_update_info: StateUpdateInfo, + ) -> anyhow::Result<()> { + let StateUpdateInfo { + storage, + slot_number, + ledger_reader, + latest_finalized_slot_number, + .. + } = &state_update_info; + + let checkpoint = StateCheckpoint::new(storage.clone(), &Rt::default().kernel(), None); + + trace!(%slot_number, "Forwarding sequencer refreshing local api_state"); + + self.checkpoint_sender + .send(Arc::new( + ConcurrentStateCheckpoint::from_state_checkpoint_with_finalized_slot( + checkpoint + .clone_with_empty_witness_dropping_temp_cache_and_ignoring_pinned_cache(), + *latest_finalized_slot_number, + ), + )) + .ok(); + + self.api_ledger_db.replace_reader(ledger_reader.clone()); + self.api_ledger_db.send_notifications_for_slot(*slot_number); + + Ok(()) + } + + async fn accept_tx( + &self, + baked_tx: FullyBakedTx, + ip_addr: IpAddr, + ) -> Result, ErrorObject> { + let url = self + .upstream_url_for("sequencer/txs/baked") + .map_err(|e| ErrorObject { + status: StatusCode::INTERNAL_SERVER_ERROR, + message: "Failed to build upstream URL".to_string(), + details: json_obj!({ "error": e.to_string() }), + })?; + + let response = self + .http + .post(url) + .header("x-forwarded-for", ip_addr.to_string()) + .json(&baked_tx) + .send() + .await + .map_err(upstream_network_error)?; + + let status = response.status(); + if !status.is_success() { + return Err(upstream_http_error(response).await); + } + + let body: TxInfoWithConfirmation::Spec>, Self::Confirmation> = + response.json().await.map_err(|e| ErrorObject { + status: StatusCode::BAD_GATEWAY, + message: "Failed to decode upstream response".to_string(), + details: json_obj!({ "error": e.to_string() }), + })?; + + let tx_hash = body.id; + self.txsm.notify(tx_hash, TxStatus::Submitted); + + Ok(AcceptedTx { + tx: baked_tx, + tx_hash, + confirmation: body.confirmation, + }) + } + + async fn get_api_tx( + &self, + tx_hash: TxHash, + ) -> anyhow::Result>> { + let url = self.upstream_url_for(&format!("sequencer/txs/{tx_hash}"))?; + let response = self.http.get(url).send().await?; + + if response.status() == ReqwestStatusCode::NOT_FOUND { + return Ok(None); + } + if !response.status().is_success() { + anyhow::bail!( + "Upstream get_tx returned unexpected status {} for tx {}", + response.status(), + tx_hash + ); + } + + Ok(Some(response.json().await?)) + } + + async fn tx_status( + &self, + tx_hash: &TxHash, + ) -> anyhow::Result::Da as DaSpec>::TransactionId>> { + if let Some(status) = self.txsm.get_cached(tx_hash) { + return Ok(status); + } + + let url = self.upstream_url_for(&format!("sequencer/txs/{tx_hash}/status"))?; + let response = match self.http.get(url).send().await { + Ok(response) => response, + Err(e) => { + debug!(%tx_hash, error = %e, "Upstream tx_status query failed; returning Unknown"); + return Ok(TxStatus::Unknown); + } + }; + + if response.status() == ReqwestStatusCode::NOT_FOUND { + return Ok(TxStatus::Unknown); + } + if !response.status().is_success() { + warn!( + %tx_hash, + status = %response.status(), + "Upstream tx_status returned non-success; returning Unknown" + ); + return Ok(TxStatus::Unknown); + } + + // The upstream returns `{ id, status, ...status_fields }` via a flattened `TxStatus` + // tagged by the `status` field; deserialize the full envelope and pull the status out. + let envelope: UpstreamTxInfo<<::Da as DaSpec>::TransactionId> = + response.json().await?; + Ok(envelope.status) + } + + async fn sequencer_role(&self) -> crate::SequencerRole { + // Forwarding nodes do not produce batches. Report as a DA-only replica so downstream + // callers treat this node as a replica. + crate::SequencerRole::DaOnlyReplica + } +} + +#[async_trait] +impl ProofBlobSender for ForwardingSequencer +where + S: Spec, + Rt: Runtime, + Da: DaService, +{ + async fn produce_and_publish_proof_blob( + &self, + _proof_blob: SerializedProofWithDetailsBytes, + ) -> anyhow::Result<()> { + anyhow::bail!( + "ForwardingSequencer cannot publish proof blobs; run a preferred or standard sequencer if this node needs to submit proofs" + ) + } +} + +/// Envelope used to parse responses from the upstream's `GET /sequencer/txs/{hash}/status` +/// endpoint. The upstream flattens `TxStatus` into the response object; `serde(flatten)` on the +/// deserialize side lets us recover the `TxStatus` variant using the `status` tag. +#[derive(serde::Deserialize)] +struct UpstreamTxInfo { + #[allow(dead_code)] + id: TxHash, + #[serde(flatten)] + status: TxStatus, +} + +fn upstream_network_error(e: reqwest::Error) -> ErrorObject { + ErrorObject { + status: StatusCode::BAD_GATEWAY, + message: "Failed to reach upstream sequencer".to_string(), + details: json_obj!({ "error": e.to_string() }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn url_joining_with_trailing_slash_preserves_path() { + let base = reqwest::Url::parse("http://localhost:1234/").unwrap(); + assert_eq!( + base.join("sequencer/txs/baked").unwrap().as_str(), + "http://localhost:1234/sequencer/txs/baked", + ); + } + + #[test] + fn url_joining_without_trailing_slash_replaces_last_segment() { + // Baseline check: reqwest::Url semantics drop the final segment when there's no slash. + // The forwarding sequencer always constructs its base URL from a user-provided config value, + // so this test just pins the behavior. + let base = reqwest::Url::parse("http://localhost:1234/api").unwrap(); + assert_eq!( + base.join("sequencer/txs/baked").unwrap().as_str(), + "http://localhost:1234/sequencer/txs/baked", + ); + } + + #[tokio::test] + async fn upstream_http_error_preserves_4xx_status() { + let client = reqwest::Client::new(); + let server = tokio::task::spawn(async move { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handle = tokio::spawn(async move { + let app = axum::Router::new().route( + "/err", + axum::routing::get(|| async { + ( + axum::http::StatusCode::BAD_REQUEST, + axum::Json(serde_json::json!({ "message": "nope" })), + ) + }), + ); + axum::serve(listener, app).await.unwrap(); + }); + (addr, handle) + }) + .await + .unwrap(); + + let response = client + .get(format!("http://{}/err", server.0)) + .send() + .await + .unwrap(); + let err = upstream_http_error(response).await; + assert_eq!(err.status, StatusCode::BAD_REQUEST); + server.1.abort(); + } + + #[tokio::test] + async fn upstream_http_error_maps_unknown_to_bad_gateway() { + // Construct a response with an unusual status to verify the 2xx/redirect fallback path. + let client = reqwest::Client::new(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + let server = tokio::spawn(async move { + let app = axum::Router::new().route( + "/redir", + axum::routing::get(|| async { + ( + axum::http::StatusCode::MOVED_PERMANENTLY, + [("location", "/elsewhere")], + "", + ) + }), + ); + axum::serve(listener, app).await.unwrap(); + }); + + let response = client + .get(format!("http://{addr}/redir")) + // Disable redirect following so we get the raw 301 response. + .send() + .await + .unwrap(); + // reqwest default-follows redirects. If it did not redirect, the final status is 301. + // If it did follow, final status will be 404. In either case, the mapping is sane. + let mapped = upstream_http_error(response).await; + assert!(mapped.status.is_client_error() || mapped.status == StatusCode::BAD_GATEWAY); + server.abort(); + } +} + +async fn upstream_http_error(response: reqwest::Response) -> ErrorObject { + let upstream_status = response.status(); + let body_text = response.text().await.unwrap_or_default(); + + // Preserve the upstream's HTTP status when it's a client-side error (4xx) so callers can + // react to things like 400 (bad tx) or 503 (upstream not ready). Map transport-level failures + // to 502 so clients know the error originated between us and the upstream. + let mapped_status = if upstream_status.is_client_error() || upstream_status.is_server_error() { + StatusCode::from_u16(upstream_status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY) + } else { + StatusCode::BAD_GATEWAY + }; + + // If the upstream body parses as a known error envelope, forward its message; otherwise + // include the raw body for debuggability. + let parsed: Option = serde_json::from_str(&body_text).ok(); + let details = match &parsed { + Some(value) => json_obj!({ "upstream": value.clone() }), + None => json_obj!({ "upstream_body": body_text.clone() }), + }; + + ErrorObject { + status: mapped_status, + message: format!("Upstream sequencer returned {upstream_status}"), + details, + } +} diff --git a/crates/full-node/sov-sequencer/src/lib.rs b/crates/full-node/sov-sequencer/src/lib.rs index 2ee8486ce7..72525e7225 100644 --- a/crates/full-node/sov-sequencer/src/lib.rs +++ b/crates/full-node/sov-sequencer/src/lib.rs @@ -7,6 +7,7 @@ pub(crate) mod metrics; pub mod rest_api; mod tx_status; +pub mod forwarding; pub mod preferred; pub mod standard; #[cfg(feature = "test-utils")] @@ -21,6 +22,7 @@ pub use common::ForcedTxBatchNotification; pub use common::StateUpdateNotification; pub use common::{react_to_state_updates, AcceptTxErrorCode, AcceptTxErrorDetails, Sequencer}; pub use config::{SeqConfigExtension, SequencerConfig, SequencerKindConfig, SovRateLimiterConfig}; +pub use forwarding::{ForwardingSequencer, ForwardingSequencerConfig}; pub use preferred::SequencerRole; pub use rest_api::SequencerApis; use serde::Serialize; diff --git a/crates/full-node/sov-sequencer/src/preferred/mod.rs b/crates/full-node/sov-sequencer/src/preferred/mod.rs index 026340f878..7cc369143c 100644 --- a/crates/full-node/sov-sequencer/src/preferred/mod.rs +++ b/crates/full-node/sov-sequencer/src/preferred/mod.rs @@ -990,7 +990,7 @@ where } /// Transaction confirmation data of [`PreferredSequencer`]. -#[derive(derivative::Derivative, serde::Serialize)] +#[derive(derivative::Derivative, serde::Serialize, serde::Deserialize)] #[derivative(Clone(bound = ""), Debug(bound = "S: Spec, Rt: Runtime"))] #[serde(bound = "S: Spec, Rt: Runtime")] pub struct Confirmation @@ -1002,7 +1002,7 @@ where receipt: ApiTxEffect>, tx_number: u64, /// The timestamp of the transaction. - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none", default)] timestamp_nanos: Option, } diff --git a/crates/full-node/sov-sequencer/src/rest_api.rs b/crates/full-node/sov-sequencer/src/rest_api.rs index 32cf13148b..6ffc1c0c82 100644 --- a/crates/full-node/sov-sequencer/src/rest_api.rs +++ b/crates/full-node/sov-sequencer/src/rest_api.rs @@ -1,6 +1,6 @@ //! Utilities and definitions for the sequencer's REST APIs. -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::time::{Duration, Instant}; @@ -18,7 +18,7 @@ use sov_metrics::{track_metrics, HttpMetrics}; use sov_modules_api::capabilities::TransactionAuthenticator; use sov_modules_api::macros::config_value; use sov_modules_api::runtime::Runtime; -use sov_modules_api::{RawTx, RuntimeEventProcessor, RuntimeEventResponse, Spec}; +use sov_modules_api::{FullyBakedTx, RawTx, RuntimeEventProcessor, RuntimeEventResponse, Spec}; use sov_rest_utils::handle_bad_ws_request; use sov_rest_utils::{ errors, preconfigured_router_layers, serve_generic_ws_subscription, @@ -132,18 +132,28 @@ impl CompressionQuery { pub struct SequencerApis { sequencer: Seq, shutdown_receiver: Receiver<()>, + trusted_proxies: Vec, } impl SequencerApis { /// Creates a new Axum router for this sequencer. - pub fn rest_api_server(sequencer: Seq, shutdown_receiver: Receiver<()>) -> axum::Router<()> { + pub fn rest_api_server( + sequencer: Seq, + shutdown_receiver: Receiver<()>, + trusted_proxies: Vec, + ) -> axum::Router<()> { let state = Self { sequencer, shutdown_receiver, + trusted_proxies, }; let router = axum::Router::new() .route("/sequencer/txs", axum::routing::post(Self::axum_accept_tx)) + .route( + "/sequencer/txs/baked", + axum::routing::post(Self::axum_accept_baked_tx), + ) .route("/sequencer/ready", axum::routing::get(Self::axum_get_ready)) .route( "/sequencer/txs/{tx_hash}/status", @@ -227,7 +237,7 @@ impl SequencerApis { headers: axum::http::HeaderMap, ws: ws::WebSocketUpgrade, ) -> Result { - let ip_addr = get_client_ip(headers, Some(&connect_info)) + let ip_addr = get_client_ip(headers, Some(&connect_info), &state.trusted_proxies) .map_err(|e| IntoResponse::into_response(e.to_error_object()))?; // Limit the incoming messsages directly on the web-socket @@ -493,12 +503,11 @@ impl SequencerApis { state: State, tx_hash: Path, ) -> ApiResult> { - let tx = state.sequencer.get_tx(tx_hash.0).await.map_err(|e| { + let tx = state.sequencer.get_api_tx(tx_hash.0).await.map_err(|e| { tracing::error!(error = %e, "Error getting transaction"); errors::database_error_500("Unable to retrieve transaction").into_response() })?; if let Some(tx) = tx { - let tx = ApiAcceptedTx::<_>::from_accepted_tx::(tx); Ok(tx.into()) } else { Err(errors::not_found_404("Transaction", tx_hash.0)) @@ -513,7 +522,7 @@ impl SequencerApis { ) -> ApiResult< TxInfoWithConfirmation::Spec>, Seq::Confirmation>, > { - let ip_addr = get_client_ip(headers, Some(&connect_info)) + let ip_addr = get_client_ip(headers, Some(&connect_info), &state.trusted_proxies) .map_err(|e| IntoResponse::into_response(e.to_error_object()))?; let raw_tx = RawTx::new(tx.0.body.blob); @@ -540,6 +549,39 @@ impl SequencerApis { .into()) } + // Accepts a pre-baked transaction (already wrapped with its authenticator discriminant). + // Used by `ForwardingSequencer` to forward txs whose auth mode is not necessarily standard + // (e.g. EVM-authenticated txs). + async fn axum_accept_baked_tx( + connect_info: ConnectInfo, + headers: axum::http::HeaderMap, + state: State, + tx: Json, + ) -> ApiResult< + TxInfoWithConfirmation::Spec>, Seq::Confirmation>, + > { + let ip_addr = get_client_ip(headers, Some(&connect_info), &state.trusted_proxies) + .map_err(|e| IntoResponse::into_response(e.to_error_object()))?; + + let tx_with_hash = state + .sequencer + .accept_tx(tx.0, ip_addr) + .await + .map_err(|e| { + if e.status.is_server_error() { + tracing::error!(error = ?e, "Error accepting baked transaction"); + } + IntoResponse::into_response(e) + })?; + + Ok(TxInfoWithConfirmation { + id: tx_with_hash.tx_hash, + confirmation: tx_with_hash.confirmation, + status: TxStatus::Submitted, + } + .into()) + } + async fn subscribe_to_events( State(state): State, filter: FilterQuery, @@ -754,7 +796,7 @@ pub struct TxInfoWithConfirmation { } /// An accepted transaction, with the transaction body and confirmation data. -#[derive(Clone, serde::Serialize)] +#[derive(Clone, serde::Serialize, serde::Deserialize)] pub struct ApiAcceptedTx { /// The hex encoded transaction hash pub id: TxHash, diff --git a/crates/full-node/sov-sequencer/tests/integration/preferred_tx_nonce_queue.rs b/crates/full-node/sov-sequencer/tests/integration/preferred_tx_nonce_queue.rs index 13814a2c22..c1c49a88be 100644 --- a/crates/full-node/sov-sequencer/tests/integration/preferred_tx_nonce_queue.rs +++ b/crates/full-node/sov-sequencer/tests/integration/preferred_tx_nonce_queue.rs @@ -83,7 +83,9 @@ async fn create_test_rollup( let mut preferred_config = match &c.sequencer_config { SequencerKindConfig::Preferred(p) => p.clone(), - SequencerKindConfig::Standard(_) => PreferredSequencerConfig::default(), + SequencerKindConfig::Standard(_) | SequencerKindConfig::Forwarding(_) => { + PreferredSequencerConfig::default() + } }; preferred_config.batch_execution_time_limit_millis = MAX_BATCH_EXECUTION_TIME_MILLIS; preferred_config.maximum_future_nonce_delta = maximum_future_nonce_delta; diff --git a/crates/full-node/sov-sequencer/tests/integration/rate_limits.rs b/crates/full-node/sov-sequencer/tests/integration/rate_limits.rs index 20f85b0d6e..6bdd4f469c 100644 --- a/crates/full-node/sov-sequencer/tests/integration/rate_limits.rs +++ b/crates/full-node/sov-sequencer/tests/integration/rate_limits.rs @@ -92,6 +92,7 @@ async fn create_test_rollup( .set_config(|c| { c.storage = StoragePath::Tmp(dir); c.max_concurrent_blobs = 64; + c.trusted_proxies = vec![std::net::Ipv4Addr::LOCALHOST.into()]; if let SequencerKindConfig::Preferred(ref mut config) = &mut c.sequencer_config { config.num_cache_warmup_workers = 0; config.batch_execution_time_limit_millis = 3000; diff --git a/crates/full-node/sov-stf-runner/src/http/mod.rs b/crates/full-node/sov-stf-runner/src/http/mod.rs index 089194b4d5..b678bcf1e8 100644 --- a/crates/full-node/sov-stf-runner/src/http/mod.rs +++ b/crates/full-node/sov-stf-runner/src/http/mod.rs @@ -14,7 +14,7 @@ use jsonrpsee::server::{ use jsonrpsee::types::{ErrorCode, ErrorObject}; use jsonrpsee::RpcModule; use sov_metrics::{track_metrics, HttpMetrics}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tokio::net::TcpListener; use tokio::sync::watch; @@ -30,19 +30,25 @@ use sov_rest_utils::GetIPResult; // Middleware to inject SocketAddr from axum's ConnectInfo into the request extensions // so that jsonrpsee RPC handlers can access it via the Extensions parameter #[derive(Clone)] -struct InjectSocketAddrLayer; +struct InjectSocketAddrLayer { + trusted_proxies: Vec, +} impl Layer for InjectSocketAddrLayer { type Service = InjectSocketAddrService; fn layer(&self, inner: S) -> Self::Service { - InjectSocketAddrService { inner } + InjectSocketAddrService { + inner, + trusted_proxies: self.trusted_proxies.clone(), + } } } #[derive(Clone)] struct InjectSocketAddrService { inner: S, + trusted_proxies: Vec, } impl tower::Service> for InjectSocketAddrService @@ -66,7 +72,7 @@ where let headers = req.headers(); let connect_info = req.extensions().get::>(); - let maybe_ip = get_client_ip(headers.clone(), connect_info); + let maybe_ip = get_client_ip(headers.clone(), connect_info, &self.trusted_proxies); req.extensions_mut().insert(GetIPResult { maybe_ip: Arc::new(maybe_ip), }); @@ -81,9 +87,11 @@ pub(crate) async fn start_http_server( methods: RpcModule<()>, mut shutdown_receiver: watch::Receiver<()>, cors_configuration: CorsConfiguration, + trusted_proxies: Vec, ) -> anyhow::Result>> { let rest_address = axum_listener.local_addr()?; - let (rpc_router, server_handle) = rpc_module_to_router(methods, cors_configuration); + let (rpc_router, server_handle) = + rpc_module_to_router(methods, cors_configuration, trusted_proxies); let handle = tokio::spawn(async move { tracing::info!(%rest_address, "Starting HTTP server"); @@ -126,6 +134,7 @@ pub(crate) async fn start_http_server( pub fn rpc_module_to_router( methods: RpcModule<()>, cors_config: CorsConfiguration, + trusted_proxies: Vec, ) -> (axum::Router, ServerHandle) { let (stop_handle, server_handle) = stop_channel(); let cors_layer = match cors_config { @@ -148,7 +157,7 @@ pub fn rpc_module_to_router( let ws_service = error_layer.layer(ws_service); // Wrap services with the SocketAddr injection layer - let inject_addr_layer = InjectSocketAddrLayer; + let inject_addr_layer = InjectSocketAddrLayer { trusted_proxies }; let http_service = inject_addr_layer.layer(http_service); let ws_service = inject_addr_layer.layer(ws_service); @@ -305,6 +314,7 @@ mod tests { methods, shutdown_receiver, CorsConfiguration::Restrictive, + vec![], ) .await .unwrap(); diff --git a/crates/full-node/sov-stf-runner/src/runner.rs b/crates/full-node/sov-stf-runner/src/runner.rs index 31a60da6de..c703cd4b53 100644 --- a/crates/full-node/sov-stf-runner/src/runner.rs +++ b/crates/full-node/sov-stf-runner/src/runner.rs @@ -1,4 +1,4 @@ -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::Duration; @@ -260,6 +260,7 @@ where router: axum::Router<()>, methods: RpcModule<()>, cors_configuration: CorsConfiguration, + trusted_proxies: Vec, ) -> anyhow::Result<()> { let http_task_handle = crate::http::start_http_server( self.axum_tcp @@ -269,6 +270,7 @@ where methods, self.secondary_shutdown_sender.subscribe(), cors_configuration, + trusted_proxies, ) .await?; diff --git a/crates/module-system/module-schemas/rollup-config.json b/crates/module-system/module-schemas/rollup-config.json index 0de09c3a36..d43131635c 100644 --- a/crates/module-system/module-schemas/rollup-config.json +++ b/crates/module-system/module-schemas/rollup-config.json @@ -303,6 +303,39 @@ } ] }, + "ForwardingSequencerConfig": { + "description": "Configuration for a `ForwardingSequencer`, a read-only sequencer that forwards tx submissions to a remote preferred sequencer while serving reads from the local node.", + "type": "object", + "required": [ + "upstream_da_address", + "upstream_url" + ], + "properties": { + "readiness_cache_ms": { + "description": "TTL in milliseconds for the cached readiness result from the upstream sequencer.", + "default": 200, + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + "request_timeout_ms": { + "description": "HTTP request timeout in milliseconds for calls to the upstream sequencer.", + "default": 10000, + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + "upstream_da_address": { + "description": "DA address of the upstream preferred sequencer. This is used by local read/simulation APIs so they match the upstream's preferred sequencer behavior.", + "type": "string" + }, + "upstream_url": { + "description": "Base URL of the upstream preferred sequencer (e.g. `http://leader:12346`). Transaction submissions are forwarded to this endpoint.", + "type": "string" + } + }, + "additionalProperties": false + }, "HexHash": { "description": "32 bytes in hexadecimal format, with `0x` prefix.", "type": "string", @@ -341,6 +374,14 @@ "string", "null" ] + }, + "trusted_proxies": { + "description": "Reverse proxies whose `X-Forwarded-For` headers should be trusted. Requests coming from other peers will ignore `X-Forwarded-For` and use the direct socket address instead.", + "default": [], + "type": "array", + "items": { + "type": "string" + } } } }, @@ -1127,6 +1168,19 @@ } }, "additionalProperties": false + }, + { + "description": "A \"Forwarding\" sequencer that serves reads locally and forwards tx submissions to a remote preferred sequencer.", + "type": "object", + "required": [ + "forwarding" + ], + "properties": { + "forwarding": { + "$ref": "#/definitions/ForwardingSequencerConfig" + } + }, + "additionalProperties": false } ], "required": [ diff --git a/crates/module-system/sov-modules-api/src/sequencing_metadata.rs b/crates/module-system/sov-modules-api/src/sequencing_metadata.rs index 9b33dcf855..00321f7f02 100644 --- a/crates/module-system/sov-modules-api/src/sequencing_metadata.rs +++ b/crates/module-system/sov-modules-api/src/sequencing_metadata.rs @@ -19,6 +19,7 @@ use crate::capabilities::SequencingDataTrait; BorshSerialize, BorshDeserialize, serde::Serialize, + serde::Deserialize, )] #[serde(transparent)] pub struct HDTimestamp(u128); diff --git a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/endpoints.rs b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/endpoints.rs index 85b9d9a9b9..09ff2d1ee1 100644 --- a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/endpoints.rs +++ b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/endpoints.rs @@ -400,6 +400,7 @@ mod tests { bind_host: bind_host.to_string(), bind_port, public_address: public_address.map(|s| s.to_string()), + trusted_proxies: vec![], cors: sov_stf_runner::CorsConfiguration::Permissive, }, save_tx_bodies: false, diff --git a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs index 7c2bae1565..057b4942b9 100644 --- a/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs +++ b/crates/module-system/sov-modules-rollup-blueprint/src/native_only/mod.rs @@ -27,6 +27,7 @@ use sov_rollup_interface::node::da::{DaService, SlotData}; use sov_rollup_interface::node::SyncStatus; use sov_rollup_interface::storage::HierarchicalStorageManager; use sov_rollup_interface::ProvableHeightTracker; +use sov_sequencer::forwarding::ForwardingSequencer; use sov_sequencer::preferred::PreferredSequencer; use sov_sequencer::standard::StdSequencer; use sov_sequencer::{ProofBlobSender, Sequencer, SequencerApis, SequencerKindConfig}; @@ -41,7 +42,7 @@ use sov_stf_runner::{ StateTransitionRunner, }; use sov_stf_runner::{make_da_sync_state, DaServiceWithCachedFinalizedHeaders}; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use tokio::net::TcpListener; use tokio::signal::unix::SignalKind; @@ -241,9 +242,12 @@ pub trait FullNodeBlueprint: RollupBlueprint { da_address, ) .await?; - endpoints.axum_router = endpoints.axum_router.merge( - SequencerApis::rest_api_server(sequencer.clone(), shutdown_receiver), - ); + endpoints.axum_router = + endpoints.axum_router.merge(SequencerApis::rest_api_server( + sequencer.clone(), + shutdown_receiver, + rollup_config.runner.http_config.trusted_proxies.clone(), + )); Ok(SequencerCreationReceipt { api_state: sequencer.api_state(), @@ -283,9 +287,55 @@ pub trait FullNodeBlueprint: RollupBlueprint { da_address, ) .await?; - endpoints.axum_router = endpoints.axum_router.merge( - SequencerApis::rest_api_server(sequencer.clone(), shutdown_receiver), - ); + endpoints.axum_router = + endpoints.axum_router.merge(SequencerApis::rest_api_server( + sequencer.clone(), + shutdown_receiver, + rollup_config.runner.http_config.trusted_proxies.clone(), + )); + + Ok(SequencerCreationReceipt { + api_state: sequencer.api_state(), + endpoints, + background_handles, + proof_sender: Arc::new(sequencer), + api_ledger_db: api_ledger_db.clone(), + da_address, + }) + } + SequencerKindConfig::Forwarding(seq_config) => { + let (sequencer, background_handles) = + ForwardingSequencer::::create( + state_update_receiver.clone(), + da_sync_state, + &rollup_config.sequencer.with_seq_config(seq_config.clone()), + ledger_db.clone(), + api_ledger_db.clone(), + shutdown_sender, + ) + .await?; + + let da_address = seq_config.upstream_da_address.parse().map_err(|err| { + anyhow::anyhow!( + "Failed to parse forwarding sequencer upstream_da_address {:?}: {:?}", + seq_config.upstream_da_address, + err + ) + })?; + let mut endpoints = self + .sequencer_additional_apis( + sequencer.clone(), + rollup_config, + shutdown_receiver.clone(), + da_address, + ) + .await?; + endpoints.axum_router = + endpoints.axum_router.merge(SequencerApis::rest_api_server( + sequencer.clone(), + shutdown_receiver, + rollup_config.runner.http_config.trusted_proxies.clone(), + )); Ok(SequencerCreationReceipt { api_state: sequencer.api_state(), @@ -588,6 +638,7 @@ pub trait FullNodeBlueprint: RollupBlueprint { let endpoints = NodeEndpointsContainer { inner: endpoints, cors_configuration: rollup_config.runner.http_config.cors, + trusted_proxies: rollup_config.runner.http_config.trusted_proxies.clone(), }; background_handles.extend(sequencer.background_handles); @@ -639,6 +690,7 @@ fn validate_heights( pub struct NodeEndpointsContainer { inner: NodeEndpoints, cors_configuration: CorsConfiguration, + trusted_proxies: Vec, } /// Dependencies needed to run the rollup. @@ -673,6 +725,7 @@ impl, M: ExecutionMode> Rollup { self.endpoints.inner.axum_router, self.endpoints.inner.jsonrpsee_module, self.endpoints.cors_configuration, + self.endpoints.trusted_proxies, ) .await .context("Failed to start Axum Server")?; diff --git a/crates/rollup-interface/src/state_machine/stf/batch.rs b/crates/rollup-interface/src/state_machine/stf/batch.rs index 2859f2778b..a3d9d7099d 100644 --- a/crates/rollup-interface/src/state_machine/stf/batch.rs +++ b/crates/rollup-interface/src/state_machine/stf/batch.rs @@ -26,6 +26,7 @@ pub struct FullyBakedTx { pub data: Bytes, /// Sequencer-provided metadata for each transaction (e.g., timestamps). /// This data is NOT signed by users but is added by the sequencer. + /// Security invariant: human-readable/public inputs must never be allowed to set this field. #[serde_as(as = "Option")] pub sequencing_data: Option, } @@ -84,6 +85,8 @@ impl<'de> Deserialize<'de> for FullyBakedTx { where D: serde::Deserializer<'de>, { + let is_human_readable = deserializer.is_human_readable(); + #[derive(Deserialize)] struct FullyBakedTxHelper { #[serde(with = "serde_with::As::")] @@ -94,6 +97,12 @@ impl<'de> Deserialize<'de> for FullyBakedTx { let helper = FullyBakedTxHelper::deserialize(deserializer)?; + if is_human_readable && helper.sequencing_data.is_some() { + return Err(serde::de::Error::custom( + "sequencing_data is internal-only and must not be accepted from human-readable inputs", + )); + } + let total_size = helper.data.len() + helper.sequencing_data.as_ref().map_or(0, |d| d.len()); if total_size > MAX_FULLY_BAKED_TX_SIZE { @@ -415,6 +424,38 @@ mod tests { assert!(deserialized.sequencing_data.is_some()); } + #[test] + fn test_fullybaked_serde_json_rejects_sequencing_data() { + let data = vec![1u8; 1024]; + let mut tx = FullyBakedTx::new(data); + tx.set_sequencing_metadata(&vec![2u8; 16]); + + let serialized = serde_json::to_string(&tx).unwrap(); + + let result: Result = serde_json::from_str(&serialized); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("sequencing_data is internal-only")); + } + + #[test] + fn test_fullybaked_bincode_allows_internal_sequencing_data() { + let data = vec![1u8; 1024]; + let mut tx = FullyBakedTx::new(data.clone()); + let seq_data = vec![2u8; 16]; + let expected_sequencing_data = borsh::to_vec(&seq_data).unwrap(); + tx.set_sequencing_metadata(&seq_data); + + let serialized = bincode::serialize(&tx).unwrap(); + let deserialized: FullyBakedTx = bincode::deserialize(&serialized).unwrap(); + + assert_eq!(deserialized.data.as_ref(), &data); + assert_eq!( + deserialized.sequencing_data.as_deref(), + Some(expected_sequencing_data.as_slice()) + ); + } + #[test] fn test_fullybaked_total_size_exceeds_limit() { // Create a transaction where data + sequencing_data exceeds limit diff --git a/crates/utils/sov-rest-utils/src/get_ip.rs b/crates/utils/sov-rest-utils/src/get_ip.rs index 3c0515608c..5f0a3ad004 100644 --- a/crates/utils/sov-rest-utils/src/get_ip.rs +++ b/crates/utils/sov-rest-utils/src/get_ip.rs @@ -36,18 +36,20 @@ pub struct GetIPResult { pub maybe_ip: Arc>, } -/// Get the original sender's IP address. +/// Get the sender's IP address, trusting `X-Forwarded-For` only from configured proxies. pub fn get_client_ip( headers: HeaderMap, connect_info: Option<&ConnectInfo>, + trusted_proxies: &[IpAddr], ) -> Result { - if headers.contains_key(&X_FORWARDED_FOR) { + let sock_addr = connect_info.ok_or(ClientIpError::MissingConnectInfo)?; + + if trusted_proxies.contains(&sock_addr.ip()) && headers.contains_key(&X_FORWARDED_FOR) { return rightmost_x_forwarded_for(&headers) .map_err(ClientIpError::InvalidXForwardedForEncoding); } // Fallback to the socket address from ConnectInfo - let sock_addr = connect_info.ok_or(ClientIpError::MissingConnectInfo)?; Ok(sock_addr.ip()) } @@ -70,7 +72,8 @@ mod tests { "x-forwarded-for", HeaderValue::from_static(frowarded_for_ip), ); - let ip = get_client_ip(headers, Some(&connect_info)).unwrap(); + let ip = + get_client_ip(headers, Some(&connect_info), &[connect_info_ip.into()]).unwrap(); assert_eq!(ip.to_string(), ip.to_string()); } @@ -81,29 +84,45 @@ mod tests { "X-Forwarded-For", HeaderValue::from_static(frowarded_for_ip), ); - let ip = get_client_ip(headers, Some(&connect_info)).unwrap(); + let ip = + get_client_ip(headers, Some(&connect_info), &[connect_info_ip.into()]).unwrap(); assert_eq!(frowarded_for_ip.to_string(), ip.to_string()); } // If x-forwarded-for is not set then get the ip from ConnectInfo { let headers = HeaderMap::new(); - let ip = get_client_ip(headers, Some(&connect_info)).unwrap(); + let ip = get_client_ip(headers, Some(&connect_info), &[]).unwrap(); assert_eq!(connect_info_ip.to_string(), ip.to_string()); } - // Many ips in x-forwarded-for + // Untrusted peers cannot spoof their source IP via x-forwarded-for. + { + let mut headers = HeaderMap::new(); + headers.insert( + "x-forwarded-for", + HeaderValue::from_static(frowarded_for_ip), + ); + let ip = get_client_ip(headers, Some(&connect_info), &[]).unwrap(); + assert_eq!(connect_info_ip.to_string(), ip.to_string()); + } + + // Many ips in x-forwarded-for from a trusted proxy { let many_ips = "223.223.223.223,323.323.323.32,123.123.123.123"; let mut headers = HeaderMap::new(); headers.insert("x-forwarded-for", HeaderValue::from_static(many_ips)); - let ip = get_client_ip(headers, None).unwrap(); + let ip = + get_client_ip(headers, Some(&connect_info), &[connect_info_ip.into()]).unwrap(); assert_eq!(ip.to_string(), "123.123.123.123".to_string()); } } #[test] fn test_get_invalid_ip() { + let connect_info_ip = Ipv4Addr::new(1, 2, 3, 4); + let connect_info = ConnectInfo(SocketAddr::V4(SocketAddrV4::new(connect_info_ip, 0))); + // Test invalid ip format { let frowarded_for_ip = "123.123.123.1234"; @@ -112,7 +131,8 @@ mod tests { "x-forwarded-for", HeaderValue::from_static(frowarded_for_ip), ); - let err = get_client_ip(headers, None).unwrap_err(); + let err = + get_client_ip(headers, Some(&connect_info), &[connect_info_ip.into()]).unwrap_err(); assert_eq!( err, ClientIpError::InvalidXForwardedForEncoding(Error::MalformedHeaderValue { @@ -125,7 +145,7 @@ mod tests { // Test missing ip { let headers = HeaderMap::new(); - let err = get_client_ip(headers, None).unwrap_err(); + let err = get_client_ip(headers, None, &[]).unwrap_err(); assert_eq!(err, ClientIpError::MissingConnectInfo); } @@ -134,7 +154,8 @@ mod tests { let empty_ip = ""; let mut headers = HeaderMap::new(); headers.insert("x-forwarded-for", HeaderValue::from_static(empty_ip)); - let err = get_client_ip(headers, None).unwrap_err(); + let err = + get_client_ip(headers, Some(&connect_info), &[connect_info_ip.into()]).unwrap_err(); assert_eq!( err, ClientIpError::InvalidXForwardedForEncoding(Error::MalformedHeaderValue { diff --git a/crates/utils/sov-test-utils/src/sequencer.rs b/crates/utils/sov-test-utils/src/sequencer.rs index 7ab6047a54..c5cddf5b6f 100644 --- a/crates/utils/sov-test-utils/src/sequencer.rs +++ b/crates/utils/sov-test-utils/src/sequencer.rs @@ -171,7 +171,8 @@ impl> TestSequencerSetup { .await?; let (axum_addr, sequencer_axum_server) = { - let router = SequencerApis::rest_api_server(sequencer.clone(), shutdown_receiver); + let router = + SequencerApis::rest_api_server(sequencer.clone(), shutdown_receiver, vec![]); let handle = axum_server::Handle::new(); let handle1 = handle.clone(); diff --git a/crates/utils/sov-test-utils/src/test_rollup.rs b/crates/utils/sov-test-utils/src/test_rollup.rs index 353d7e6402..e288b94064 100644 --- a/crates/utils/sov-test-utils/src/test_rollup.rs +++ b/crates/utils/sov-test-utils/src/test_rollup.rs @@ -1,6 +1,6 @@ #![allow(dead_code, missing_docs)] use crate::postgres::connection_string_from_postgres_container; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::num::NonZero; use std::path::Path; use std::path::PathBuf; @@ -109,6 +109,7 @@ pub struct RollupBuilderConfig { pub telegraf_address: sov_stf_runner::TelegrafSocketConfig, pub rollup_prover_config: RollupProverConfig, pub storage: StoragePath, + pub trusted_proxies: Vec, pub axum_host: String, pub axum_port: u16, pub max_batch_size_bytes: usize, @@ -347,15 +348,15 @@ impl + Default + 'static> RollupBuilder { pub fn rollup_config(&self) -> RollupConfig<::Address, R::DaService> { let rollup_db_config = RollupDbConfig::default_in_path(self.config.storage.path().to_path_buf()); + let mut http_config = + HttpServerConfig::on_host_port(&self.config.axum_host, self.config.axum_port); + http_config.trusted_proxies = self.config.trusted_proxies.clone(); RollupConfig { storage: rollup_db_config, runner: RunnerConfig { da_polling_interval_ms: TEST_MOCK_DA_POLLING_INTERVAL.as_millis() as u64, - http_config: HttpServerConfig::on_host_port( - &self.config.axum_host, - self.config.axum_port, - ), + http_config, concurrent_sync_tasks: 1, pre_fetched_blocks_capacity: NonZero::new(3).unwrap(), save_tx_bodies: false, @@ -417,6 +418,7 @@ impl + Default + 'static> RollupBuilder { rollup_prover_config: RollupProverConfig::Disabled, storage: storage_path, telegraf_address: MonitoringConfig::standard().telegraf_address, + trusted_proxies: Vec::new(), axum_host: "127.0.0.1".to_string(), axum_port: 0, blob_processing_timeout_secs: 60, @@ -650,7 +652,8 @@ where ) .await?; - let router = SequencerApis::rest_api_server(sequencer.clone(), shutdown_receiver.clone()); + let router = + SequencerApis::rest_api_server(sequencer.clone(), shutdown_receiver.clone(), vec![]); let addr = SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, 0)); let listener = tokio::net::TcpListener::bind(addr).await?; @@ -1167,7 +1170,7 @@ where self.wait_for_node_synced().await.unwrap(); // Extra let ideal_lag = match &self.rollup_config.sequencer.sequencer_kind_config { - SequencerKindConfig::Standard(_) => 5, + SequencerKindConfig::Standard(_) | SequencerKindConfig::Forwarding(_) => 5, SequencerKindConfig::Preferred(c) => c.ideal_lag_behind_finalized_slot, } .saturating_add(2); diff --git a/examples/demo-rollup/configs/mock_rollup_config.toml b/examples/demo-rollup/configs/mock_rollup_config.toml index 3aaba626dd..50f620349e 100644 --- a/examples/demo-rollup/configs/mock_rollup_config.toml +++ b/examples/demo-rollup/configs/mock_rollup_config.toml @@ -60,6 +60,7 @@ bind_host = "127.0.0.1" bind_port = 12346 # The fully qualified public name of the server, in case the rollup is running behind a proxy # public_address = "http://rollup.sovereign.xyz" +# trusted_proxies = ["127.0.0.1"] # Trust X-Forwarded-For only from these proxy IPs # cors = "permissive" # CORS configuration: "permissive" or "restrictive" [monitoring] @@ -136,6 +137,15 @@ ideal_lag_behind_finalized_slot = 3 # Ideal buffer of finalized slots to maintai # mempool_max_txs_count = 10000 # Maximum transactions in mempool # max_batch_size_bytes = 1048576 # Maximum batch size +# Alternative configuration for a forwarding sequencer. Serves reads locally and forwards tx +# submissions to a canonical upstream preferred sequencer. Mutually exclusive with +# [sequencer.preferred] and [sequencer.standard]. +# [sequencer.forwarding] +# upstream_url = "http://localhost:12346" +# upstream_da_address = "0x0000000000000000000000000000000000000000000000000000000000000000" +# request_timeout_ms = 10000 +# readiness_cache_ms = 200 + [sequencer.extension] max_log_limit = 20000 # response_size_limit = 1017856 # Max response size for eth_getLogs in bytes (default: 1 MiB - 30 KiB for headers and overhead) diff --git a/examples/demo-rollup/src/lib.rs b/examples/demo-rollup/src/lib.rs index 3e97d1d460..e8622e04b2 100644 --- a/examples/demo-rollup/src/lib.rs +++ b/examples/demo-rollup/src/lib.rs @@ -35,7 +35,7 @@ pub const ROLLUP_PROOF_NAMESPACE: Namespace = Namespace::const_v0(config_value!( fn sequencer_type( config: &sov_full_node_configs::sequencer::SequencerConfig, ) -> sov_modules_api::SequencerType { - if config.is_preferred_sequencer() { + if config.is_preferred_sequencer() || config.is_forwarding_sequencer() { sov_modules_api::SequencerType::Preferred } else { sov_modules_api::SequencerType::NonPreferred diff --git a/examples/demo-rollup/tests/replica/mod.rs b/examples/demo-rollup/tests/replica/mod.rs index 67220e1761..238dfbd617 100644 --- a/examples/demo-rollup/tests/replica/mod.rs +++ b/examples/demo-rollup/tests/replica/mod.rs @@ -98,7 +98,7 @@ async fn start_rollup_with_connection_string( .set_config(move |c| { c.blob_processing_timeout_secs = 300; match &mut c.sequencer_config { - SequencerKindConfig::Standard(_) => { + SequencerKindConfig::Standard(_) | SequencerKindConfig::Forwarding(_) => { panic!("Expected preferred sequencer config"); } SequencerKindConfig::Preferred(p) => {