diff --git a/.changes/changed/3255.md b/.changes/changed/3255.md new file mode 100644 index 00000000000..b2d62ccf0ce --- /dev/null +++ b/.changes/changed/3255.md @@ -0,0 +1 @@ +Adds client & server support for the existing protobuf HTTP-based remote RPC block accessor. diff --git a/bin/fuel-core/src/cli/run/rpc.rs b/bin/fuel-core/src/cli/run/rpc.rs index 9c7fb80b04b..e9243a6b83e 100644 --- a/bin/fuel-core/src/cli/run/rpc.rs +++ b/bin/fuel-core/src/cli/run/rpc.rs @@ -3,7 +3,20 @@ use clap::{ ValueEnum, }; use fuel_core_types::fuel_types::BlockHeight; -use std::net; +use std::{ + collections::HashMap, + net, +}; + +fn parse_public_http_headers(entries: &[String]) -> HashMap { + entries + .iter() + .filter_map(|e| { + e.split_once('=') + .map(|(k, v)| (k.to_string(), v.to_string())) + }) + .collect() +} #[derive(Debug, Clone, Args)] pub struct RpcArgs { @@ -29,6 +42,15 @@ pub struct RpcArgs { #[clap(long = "rpc-s3-requester-pays", env, default_value = "false")] pub rpc_s3_requester_pays: bool, + /// Public HTTP(S) base URL for block objects (e.g. CDN in front of the S3 bucket). When set, + /// gRPC returns `RemoteHttpEndpoint` URLs (`{base}/{s3-key}`) instead of `RemoteS3Bucket`; uploads still use S3. + #[clap(long = "rpc-public-block-http-url", env)] + pub rpc_public_block_http_url: Option, + + /// Optional extra HTTP headers for clients fetching blocks from `--rpc-public-block-http-url` (format `NAME=value`, repeatable). + #[clap(long = "rpc-public-block-http-header", env, value_name = "NAME=value")] + pub rpc_public_block_http_headers: Vec, + #[clap(long = "rpc-api_buffer_size", default_value = "1000", env)] pub rpc_api_buffer_size: usize, } @@ -43,6 +65,8 @@ pub enum StorageMethod { impl RpcArgs { pub fn into_config(self) -> fuel_core_block_aggregator_api::service::Config { + let public_headers = + parse_public_http_headers(&self.rpc_public_block_http_headers); let storage_method = match self.rpc_storage_method { StorageMethod::Local => { fuel_core_block_aggregator_api::service::StorageMethod::Local @@ -54,6 +78,8 @@ impl RpcArgs { .expect("storage_method=s3 requires --bucket"), endpoint_url: self.rpc_endpoint_url, requester_pays: self.rpc_s3_requester_pays, + public_block_http_url: self.rpc_public_block_http_url, + public_block_http_headers: public_headers.clone(), } } StorageMethod::S3NoPublish => { @@ -63,6 +89,8 @@ impl RpcArgs { .expect("storage_method=s3-no-publish requires --bucket"), endpoint_url: self.rpc_endpoint_url, requester_pays: self.rpc_s3_requester_pays, + public_block_http_url: self.rpc_public_block_http_url, + public_block_http_headers: public_headers, } } }; diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index c0bd3fec16d..08f70aac914 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -180,7 +180,10 @@ mod rpc_deps { default_provider::credentials::DefaultCredentialsChain, }; pub use aws_sdk_s3::Client as AWSClient; - pub use flate2::read::GzDecoder; + pub use flate2::read::{ + GzDecoder, + ZlibDecoder, + }; pub use fuel_core_block_aggregator_api::{ blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf, protobuf_types::{ @@ -190,6 +193,7 @@ mod rpc_deps { BlockResponse, NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest, RemoteBlockResponse, + RemoteHttpEndpoint, RemoteS3Bucket, block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient, block_response::Payload, @@ -199,7 +203,10 @@ mod rpc_deps { pub use prost::Message; pub use std::{ collections::HashMap, - io::Read, + io::{ + Cursor, + Read, + }, }; pub use tokio::sync::RwLock; pub use tonic::transport::Channel; @@ -207,6 +214,20 @@ mod rpc_deps { #[cfg(feature = "rpc")] use rpc_deps::*; +/// `reqwest::Client` used only for fetching block bytes over HTTP(S). Automatic response +/// decompression is disabled so `Content-Encoding` matches the raw body and manual decoding in +/// `FuelClient` stays consistent. +#[cfg(feature = "rpc")] +fn remote_block_object_http_client() -> reqwest::Client { + reqwest::Client::builder() + .no_gzip() + .no_brotli() + .no_deflate() + .no_zstd() + .build() + .expect("reqwest ClientBuilder for remote block fetches should succeed") +} + pub mod pagination; pub mod schema; pub mod types; @@ -287,6 +308,9 @@ pub struct FuelClient { rpc_client: Option>, #[cfg(feature = "rpc")] aws_client: AWSClientManager, + /// Shared HTTP client for remote block fetches (connection pool reuse). + #[cfg(feature = "rpc")] + http_client: reqwest::Client, } #[cfg(feature = "rpc")] @@ -355,6 +379,8 @@ impl FromStr for FuelClient { rpc_client: None, #[cfg(feature = "rpc")] aws_client: AWSClientManager::new(), + #[cfg(feature = "rpc")] + http_client: remote_block_object_http_client(), }) } } @@ -424,6 +450,8 @@ impl FuelClient { rpc_client: None, #[cfg(feature = "rpc")] aws_client: AWSClientManager::new(), + #[cfg(feature = "rpc")] + http_client: remote_block_object_http_client(), }) } @@ -1775,11 +1803,13 @@ impl FuelClient { .into_inner() .then(|res| { let maybe_aws_client = self.aws_client.clone(); + let http_client = self.http_client.clone(); async move { let maybe_aws_client = maybe_aws_client.clone(); let resp = res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?; - Self::convert_block_response(resp, maybe_aws_client).await + Self::convert_block_response(resp, maybe_aws_client, http_client) + .await } }); Ok(stream) @@ -1788,6 +1818,7 @@ impl FuelClient { async fn convert_block_response( resp: BlockResponse, s3_client: AWSClientManager, + http_client: reqwest::Client, ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec>)> { let payload = resp .payload @@ -1816,7 +1847,7 @@ impl FuelClient { endpoint, requester_pays, } = s3; - let zipped_bytes = Self::get_block_from_s3_bucket( + let (body, content_encoding) = Self::get_block_from_s3_bucket( s3_client, &endpoint, &bucket, @@ -1825,31 +1856,138 @@ impl FuelClient { ) .await?; - let block_bytes = Self::unzip_bytes(&zipped_bytes)?; - let block = - ProtoBlock::decode(block_bytes.as_slice()).map_err(|e| { - io::Error::other(format!("Failed to decode block: {e}")) - })?; - let (block, receipts) = - fuel_block_from_protobuf(block).map_err(|e| { - io::Error::other(format!( - "Failed to convert RPC block to internal block: {e:?}" - )) - })?; - Ok((block, receipts)) + let block_bytes = Self::decode_body_by_content_encoding( + body.as_ref(), + content_encoding.as_deref(), + )?; + Self::decode_remote_protobuf_block(&block_bytes) + } + Some(Location::Http(http)) => { + let (body, content_encoding) = + Self::get_block_from_http(&http_client, &http).await?; + let block_bytes = Self::decode_body_by_content_encoding( + body.as_ref(), + content_encoding.as_deref(), + )?; + Self::decode_remote_protobuf_block(&block_bytes) + } + None => { + Err(io::Error::other("Remote block response missing location")) } - _ => Err(io::Error::other("Remote blocks are not supported yet")), } } } } + + /// Decodes the object body using a `Content-Encoding` value from HTTP headers or S3 object + /// metadata. Missing or empty means identity (raw protobuf bytes). Codings are listed in + /// application order; this reverses to decode from the outermost coding inward (RFC 9110). + fn decode_body_by_content_encoding( + body: &[u8], + content_encoding: Option<&str>, + ) -> io::Result> { + let Some(header) = content_encoding.map(str::trim).filter(|s| !s.is_empty()) + else { + return Ok(body.to_vec()); + }; + let tokens: Vec<&str> = header + .split(',') + .map(str::trim) + .filter(|t| !t.is_empty()) + .collect(); + if tokens.is_empty() { + return Ok(body.to_vec()); + } + let mut data = body.to_vec(); + for &token in tokens.iter().rev() { + let lower = token.trim().to_ascii_lowercase(); + if lower.is_empty() || lower == "identity" { + continue; + } + data = Self::decode_one_content_encoding_layer(&data, &lower)?; + } + Ok(data) + } + + fn decode_one_content_encoding_layer( + data: &[u8], + token_lower: &str, + ) -> io::Result> { + match token_lower { + "gzip" | "x-gzip" => Self::gunzip_remote_block_bytes(data), + "deflate" => Self::inflate_deflate_remote_block_bytes(data), + _ => Err(io::Error::other(format!( + "unsupported Content-Encoding: {token_lower}" + ))), + } + } + + fn gunzip_remote_block_bytes(data: &[u8]) -> io::Result> { + let mut decoder = GzDecoder::new(data); + let mut output = Vec::new(); + decoder + .read_to_end(&mut output) + .map_err(|e| io::Error::other(e.to_string()))?; + Ok(output) + } + + fn inflate_deflate_remote_block_bytes(data: &[u8]) -> io::Result> { + let mut decoder = ZlibDecoder::new(Cursor::new(data)); + let mut output = Vec::new(); + decoder + .read_to_end(&mut output) + .map_err(|e| io::Error::other(e.to_string()))?; + Ok(output) + } + + fn decode_remote_protobuf_block( + block_bytes: &[u8], + ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec>)> { + let block = ProtoBlock::decode(block_bytes) + .map_err(|e| io::Error::other(format!("Failed to decode block: {e}")))?; + fuel_block_from_protobuf(block).map_err(|e| { + io::Error::other(format!( + "Failed to convert RPC block to internal block: {e:?}" + )) + }) + } + + async fn get_block_from_http( + client: &reqwest::Client, + http: &RemoteHttpEndpoint, + ) -> io::Result<(prost::bytes::Bytes, Option)> { + let mut req = client.get(http.endpoint.clone()); + for (k, v) in &http.headers { + req = req.header(k, v); + } + let res = req.send().await.map_err(|e| { + io::Error::other(format!("HTTP fetch for block object failed: {e}")) + })?; + if !res.status().is_success() { + return Err(io::Error::other(format!( + "HTTP {} when fetching block object", + res.status() + ))); + } + let content_encoding = res + .headers() + .get(reqwest::header::CONTENT_ENCODING) + .and_then(|v| v.to_str().ok()) + .map(std::string::ToString::to_string); + let bytes = res + .bytes() + .await + .map_err(|e| io::Error::other(format!("Reading block object body: {e}")))?; + Ok((bytes, content_encoding)) + } + async fn get_block_from_s3_bucket( s3_client: AWSClientManager, url: &Option, bucket: &str, key: &str, requester_pays: bool, - ) -> io::Result { + ) -> io::Result<(prost::bytes::Bytes, Option)> { use aws_sdk_s3::types::RequestPayer; tracing::debug!("getting block from bucket: {} with key {}", bucket, key); let mut req = s3_client @@ -1864,6 +2002,8 @@ impl FuelClient { let obj = req.send().await.map_err(|e| { io::Error::other(format!("Failed to get object from S3: {e:?}")) })?; + let content_encoding = + obj.content_encoding().map(std::string::ToString::to_string); let bytes = obj .body .collect() @@ -1872,7 +2012,7 @@ impl FuelClient { io::Error::other(format!("Failed to get object from S3: {e:?}")) })? .into_bytes(); - Ok(bytes) + Ok((bytes, content_encoding)) } async fn new_aws_client(url: &Option) -> AWSClient { @@ -1888,13 +2028,6 @@ impl FuelClient { AWSClient::from_conf(config) } - fn unzip_bytes(bytes: &[u8]) -> io::Result> { - let mut decoder = GzDecoder::new(bytes); - let mut output = Vec::new(); - decoder.read_to_end(&mut output).map_err(io::Error::other)?; - Ok(output) - } - /// Used to get the synced height of the block aggregator, /// as it doesn't always match the latest block height pub async fn get_aggregated_height(&self) -> io::Result { @@ -1929,11 +2062,13 @@ impl FuelClient { .into_inner() .then(|res| { let maybe_aws_client = self.aws_client.clone(); + let http_client = self.http_client.clone(); async move { let maybe_aws_client = maybe_aws_client.clone(); let resp = res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?; - Self::convert_block_response(resp, maybe_aws_client).await + Self::convert_block_response(resp, maybe_aws_client, http_client) + .await } }); Ok(stream) @@ -2005,4 +2140,40 @@ mod tests { client_with_urls.get_default_url().as_str() ); } + + #[cfg(feature = "rpc")] + mod remote_block_object_decode_tests { + use super::FuelClient; + use flate2::{ + Compression, + write::GzEncoder, + }; + use std::io::Write; + + #[test] + fn no_header_is_identity() { + let raw = b"payload"; + let out = FuelClient::decode_body_by_content_encoding(raw, None).unwrap(); + assert_eq!(out, raw); + } + + #[test] + fn identity_header_passes_through() { + let raw = b"payload"; + let out = FuelClient::decode_body_by_content_encoding(raw, Some("identity")) + .unwrap(); + assert_eq!(out, raw); + } + + #[test] + fn gzip_header_decompresses() { + let raw = b"protobuf-bytes"; + let mut enc = GzEncoder::new(Vec::new(), Compression::default()); + enc.write_all(raw).unwrap(); + let gz = enc.finish().unwrap(); + let out = + FuelClient::decode_body_by_content_encoding(&gz, Some("gzip")).unwrap(); + assert_eq!(out, raw); + } + } } diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs index 649a922a589..5a573881f02 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter.rs @@ -2,6 +2,7 @@ use crate::{ block_range_response::{ BlockRangeResponse, BoxStream, + RemoteBlockPayload, }, protobuf_types::{ BlockHeightRequest as ProtoBlockHeightRequest, @@ -10,6 +11,7 @@ use crate::{ BlockResponse as ProtoBlockResponse, NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest, RemoteBlockResponse as ProtoRemoteBlockResponse, + RemoteHttpEndpoint as ProtoRemoteHttpEndpoint, RemoteS3Bucket as ProtoRemoteS3Bucket, block_aggregator_server::{ BlockAggregator, @@ -129,16 +131,25 @@ where .boxed(); Ok(tonic::Response::new(stream)) } - BlockRangeResponse::S3(inner) => { + BlockRangeResponse::Remote(inner) => { let stream = inner .map(|(height, res)| { - let s3 = ProtoRemoteS3Bucket { - bucket: res.bucket, - key: res.key, - requester_pays: res.requester_pays, - endpoint: res.aws_endpoint, + let location = match res { + RemoteBlockPayload::S3(s3) => { + ProtoRemoteLocation::S3(ProtoRemoteS3Bucket { + bucket: s3.bucket, + key: s3.key, + requester_pays: s3.requester_pays, + endpoint: s3.aws_endpoint, + }) + } + RemoteBlockPayload::Http(http) => { + ProtoRemoteLocation::Http(ProtoRemoteHttpEndpoint { + endpoint: http.url, + headers: http.headers, + }) + } }; - let location = ProtoRemoteLocation::S3(s3); let proto_response = ProtoRemoteBlockResponse { location: Some(location), }; diff --git a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs index a486ae4eb7a..d57ab50e47b 100644 --- a/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs +++ b/crates/services/block_aggregator_api/src/api/protobuf_adapter/tests.rs @@ -7,6 +7,7 @@ use crate::{ }, block_range_response::{ BlockRangeResponse, + RemoteBlockPayload, RemoteS3Response, }, blocks::old_block_source::{ @@ -139,25 +140,29 @@ async fn await_query__get_block_range__client_receives_expected_value__remote() let mut api = MockBlocksAggregatorApi::default(); // Given - let list: Vec<_> = [(BlockHeight::new(1), "1"), (BlockHeight::new(2), "2")] + let expected: Vec<(BlockHeight, RemoteS3Response)> = + [(BlockHeight::new(1), "1"), (BlockHeight::new(2), "2")] + .iter() + .map(|(height, key)| { + let bucket = "test-bucket".to_string(); + let key = key.to_string(); + let res = RemoteS3Response { + bucket, + key, + requester_pays: false, + aws_endpoint: None, + }; + (*height, res) + }) + .collect(); + let list: Vec<_> = expected .iter() - .map(|(height, key)| { - let bucket = "test-bucket".to_string(); - let key = key.to_string(); - let res = RemoteS3Response { - bucket, - key, - requester_pays: false, - aws_endpoint: None, - }; - (*height, res) - }) + .map(|(h, s)| (*h, RemoteBlockPayload::S3(s.clone()))) .collect(); - let expected = list.clone(); api.expect_get_block_range() .times(1) .returning(move |_: u32, _: u32| { - let response = BlockRangeResponse::S3( + let response = BlockRangeResponse::Remote( futures::stream::iter(list.clone().into_iter()).boxed(), ); Ok(response) diff --git a/crates/services/block_aggregator_api/src/block_range_response.rs b/crates/services/block_aggregator_api/src/block_range_response.rs index 3c185c6d8fc..120492da12f 100644 --- a/crates/services/block_aggregator_api/src/block_range_response.rs +++ b/crates/services/block_aggregator_api/src/block_range_response.rs @@ -1,7 +1,10 @@ use crate::protobuf_types::Block as ProtoBlock; use fuel_core_services::stream::Stream; use fuel_core_types::fuel_types::BlockHeight; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::Arc, +}; pub type BoxStream = core::pin::Pin + Send + 'static>>; @@ -11,8 +14,14 @@ pub enum BlockRangeResponse { Literal(BoxStream<(BlockHeight, ProtoBlock)>), /// Bytes of blocks Bytes(BoxStream<(BlockHeight, Arc<[u8]>)>), - /// A remote URL where the blocks can be fetched - S3(BoxStream<(BlockHeight, RemoteS3Response)>), + /// Remote references: either direct S3 bucket/key metadata or HTTP URLs (e.g. CDN) over the same object keys + Remote(BoxStream<(BlockHeight, RemoteBlockPayload)>), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RemoteBlockPayload { + S3(RemoteS3Response), + Http(RemoteHttpResponse), } #[derive(Debug, Clone, PartialEq, Eq)] @@ -23,12 +32,18 @@ pub struct RemoteS3Response { pub aws_endpoint: Option, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteHttpResponse { + pub url: String, + pub headers: HashMap, +} + #[cfg(test)] impl std::fmt::Debug for BlockRangeResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { BlockRangeResponse::Literal(_) => f.debug_struct("Literal").finish(), - BlockRangeResponse::S3(_) => f.debug_struct("Remote").finish(), + BlockRangeResponse::Remote(_) => f.debug_struct("Remote").finish(), BlockRangeResponse::Bytes(_) => f.debug_struct("Bytes").finish(), } } diff --git a/crates/services/block_aggregator_api/src/db/remote_cache.rs b/crates/services/block_aggregator_api/src/db/remote_cache.rs index 58dc905a485..62feb0975e3 100644 --- a/crates/services/block_aggregator_api/src/db/remote_cache.rs +++ b/crates/services/block_aggregator_api/src/db/remote_cache.rs @@ -1,5 +1,10 @@ use crate::{ - block_range_response::BlockRangeResponse, + block_range_response::{ + BlockRangeResponse, + RemoteBlockPayload, + RemoteHttpResponse, + RemoteS3Response, + }, db::{ BlocksProvider, BlocksStorage, @@ -33,10 +38,25 @@ use fuel_core_storage::{ }; use fuel_core_types::fuel_types::BlockHeight; use std::{ + collections::HashMap, io::Write, sync::Arc, }; +/// Configuration for returning [`RemoteHttpResponse`] URLs to RPC clients while objects remain stored in S3. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PublicHttpConfig { + pub base_url: String, + pub headers: HashMap, +} + +/// Joins a public HTTP(S) base (e.g. CDN origin) with the same S3 object key path used for uploads. +pub fn public_http_object_url(base: &str, key: &str) -> String { + let base = base.trim_end_matches('/'); + let key = key.trim_start_matches('/'); + format!("{base}/{key}") +} + #[allow(non_snake_case)] #[cfg(test)] mod tests; @@ -72,6 +92,7 @@ pub struct RemoteBlocksProvider { aws_bucket: String, requester_pays: bool, aws_endpoint: Option, + public_http: Option, // track consistency between runs local_persisted: S, @@ -82,12 +103,14 @@ impl RemoteBlocksProvider { aws_bucket: String, requester_pays: bool, aws_endpoint: Option, + public_http: Option, local_persisted: S, ) -> Self { RemoteBlocksProvider { aws_bucket, requester_pays, aws_endpoint, + public_http, local_persisted, } } @@ -100,18 +123,26 @@ impl RemoteBlocksProvider { let bucket = self.aws_bucket.clone(); let requester_pays = self.requester_pays; let aws_endpoint = self.aws_endpoint.clone(); + let public_http = self.public_http.clone(); let stream = futures::stream::iter((*first..=*last).map(move |height| { let block_height = BlockHeight::new(height); let key = block_height_to_key(&block_height); - let res = crate::block_range_response::RemoteS3Response { - bucket: bucket.clone(), - key: key.clone(), - requester_pays, - aws_endpoint: aws_endpoint.clone(), + let payload = if let Some(ref cfg) = public_http { + RemoteBlockPayload::Http(RemoteHttpResponse { + url: public_http_object_url(&cfg.base_url, &key), + headers: cfg.headers.clone(), + }) + } else { + RemoteBlockPayload::S3(RemoteS3Response { + bucket: bucket.clone(), + key: key.clone(), + requester_pays, + aws_endpoint: aws_endpoint.clone(), + }) }; - (block_height, res) + (block_height, payload) })); - Ok(BlockRangeResponse::S3(Box::pin(stream))) + Ok(BlockRangeResponse::Remote(Box::pin(stream))) } } diff --git a/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs b/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs index 327215caac1..b939ef129f1 100644 --- a/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs +++ b/crates/services/block_aggregator_api/src/db/remote_cache/tests.rs @@ -1,6 +1,9 @@ use super::*; use crate::{ - block_range_response::RemoteS3Response, + block_range_response::{ + RemoteBlockPayload, + RemoteS3Response, + }, blocks::old_block_source::{ BlockConverter, convertor_adapter::ProtobufBlockConverter, @@ -73,14 +76,15 @@ async fn get_block_range__happy_path() { .insert(&(), &Mode::new_s3(end)) .unwrap(); - let adapter = RemoteBlocksProvider::new(aws_bucket.clone(), false, None, storage); + let adapter = + RemoteBlocksProvider::new(aws_bucket.clone(), false, None, None, storage); // when let addresses = adapter.get_block_range(start, end).unwrap(); // then let actual = match addresses { - BlockRangeResponse::S3(stream) => stream.collect::>().await, + BlockRangeResponse::Remote(stream) => stream.collect::>().await, _ => { panic!("Expected remote response, got literal"); } @@ -94,12 +98,24 @@ async fn get_block_range__happy_path() { requester_pays: false, aws_endpoint: None, }; - (BlockHeight::new(height), res) + (BlockHeight::new(height), RemoteBlockPayload::S3(res)) }) .collect::>(); assert_eq!(actual, expected); } +#[test] +fn public_http_object_url__joins_base_and_key() { + assert_eq!( + public_http_object_url("https://cdn.example/blocks", "aa/bb/cc/dd"), + "https://cdn.example/blocks/aa/bb/cc/dd" + ); + assert_eq!( + public_http_object_url("https://cdn.example/blocks/", "/aa/bb/cc/dd"), + "https://cdn.example/blocks/aa/bb/cc/dd" + ); +} + #[tokio::test] async fn get_current_height__returns_highest_continuous_block() { // given diff --git a/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs b/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs index 82e6443dcc1..9d4411cd45b 100644 --- a/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs +++ b/crates/services/block_aggregator_api/src/db/storage_or_remote_db.rs @@ -4,6 +4,7 @@ use crate::{ BlocksProvider, BlocksStorage, remote_cache::{ + PublicHttpConfig, RemoteBlocksProvider, RemoteCache, }, @@ -46,11 +47,13 @@ impl StorageOrRemoteBlocksProvider { aws_bucket: String, requester_pays: bool, aws_endpoint_url: Option, + public_http: Option, ) -> Self { let remote_cache = RemoteBlocksProvider::new( aws_bucket, requester_pays, aws_endpoint_url, + public_http, storage, ); StorageOrRemoteBlocksProvider::Remote(remote_cache) diff --git a/crates/services/block_aggregator_api/src/service.rs b/crates/services/block_aggregator_api/src/service.rs index 140613fc4cf..a105bc97dad 100644 --- a/crates/services/block_aggregator_api/src/service.rs +++ b/crates/services/block_aggregator_api/src/service.rs @@ -17,6 +17,7 @@ use crate::{ }, db::{ BlocksProvider, + remote_cache::PublicHttpConfig, storage_or_remote_db::{ StorageOrRemoteBlocksProvider, StorageOrRemoteDB, @@ -63,6 +64,7 @@ use fuel_core_types::{ }; use futures::Stream; use std::{ + collections::HashMap, fmt::Debug, net::SocketAddr, sync::Arc, @@ -88,12 +90,17 @@ pub enum StorageMethod { bucket: String, endpoint_url: Option, requester_pays: bool, + /// When set, gRPC returns HTTP endpoint URLs (public base URL + same object key path as S3) instead of S3 bucket metadata. + public_block_http_url: Option, + public_block_http_headers: HashMap, }, // Assumes another node is publishing blocks to S3 bucket, but relaying details S3NoPublish { bucket: String, endpoint_url: Option, requester_pays: bool, + public_block_http_url: Option, + public_block_http_headers: HashMap, }, } @@ -326,17 +333,32 @@ where bucket, endpoint_url, requester_pays, + public_block_http_url, + public_block_http_headers, } | StorageMethod::S3NoPublish { bucket, endpoint_url, requester_pays, - } => StorageOrRemoteBlocksProvider::new_s3( - db.clone(), - bucket.clone(), - *requester_pays, - endpoint_url.clone(), - ), + public_block_http_url, + public_block_http_headers, + } => { + let public_http = public_block_http_url + .as_ref() + .map(String::as_str) + .filter(|s| !s.is_empty()) + .map(|base| PublicHttpConfig { + base_url: base.to_string(), + headers: public_block_http_headers.clone(), + }); + StorageOrRemoteBlocksProvider::new_s3( + db.clone(), + bucket.clone(), + *requester_pays, + endpoint_url.clone(), + public_http, + ) + } }; let convertor = Arc::new(convertor); diff --git a/tests/tests/rpc_s3.rs b/tests/tests/rpc_s3.rs index 568b5684513..fade3fd3db4 100644 --- a/tests/tests/rpc_s3.rs +++ b/tests/tests/rpc_s3.rs @@ -47,6 +47,8 @@ async fn get_block_height__can_get_value_from_rpc() { bucket: "test-bucket".to_string(), endpoint_url: Some(endpoint_url), requester_pays: false, + public_block_http_url: None, + public_block_http_headers: Default::default(), }; let config = Config::local_node_with_rpc_and_storage_method(storage_method); @@ -136,6 +138,8 @@ async fn get_block_range__can_get_from_remote_s3_bucket() { bucket: "test-bucket".to_string(), endpoint_url: Some(endpoint_url), requester_pays: false, + public_block_http_url: None, + public_block_http_headers: Default::default(), }; let config = Config::local_node_with_rpc_and_storage_method(storage_method); let srv = FuelService::from_database(Database::default(), config.clone()) @@ -199,6 +203,8 @@ async fn get_block_height__no_publish__can_get_value_from_rpc() { bucket: "test-bucket".to_string(), endpoint_url: Some(endpoint_url), requester_pays: false, + public_block_http_url: None, + public_block_http_headers: Default::default(), }; let config = Config::local_node_with_rpc_and_storage_method(storage_method); @@ -242,6 +248,8 @@ async fn submit_and_await_commit__no_publish__does_not_publish_to_s3_bucket() { bucket: "test-bucket".to_string(), endpoint_url: Some(endpoint_url), requester_pays: false, + public_block_http_url: None, + public_block_http_headers: Default::default(), }; let config = Config::local_node_with_rpc_and_storage_method(storage_method); let srv = FuelService::from_database(Database::default(), config.clone())