Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
608d9a6
add s3 connection with billing to ic-gateway
shilingwang Apr 17, 2026
f134ffe
remove the whoami part
shilingwang Apr 17, 2026
9130d35
use cors middleware
shilingwang Apr 17, 2026
0073d2c
move header constant to router
shilingwang Apr 17, 2026
7e4873c
refactor storage/handlers.rs
shilingwang Apr 17, 2026
bb89666
refactor storage to be similar like ic
shilingwang Apr 17, 2026
589733e
refactor to put all storage related code under routing/storage
shilingwang Apr 17, 2026
049e553
address comments
shilingwang Apr 21, 2026
0dfc412
change the new endpoint name with prefix of storage/v1
shilingwang Apr 21, 2026
47ddf5d
remove the fake ingress to prevent production leak
shilingwang Apr 22, 2026
a0a65d3
remove unreachable second BUDGET_REFRESH_DELAY
shilingwang Apr 22, 2026
0206638
use existing dns resolver
shilingwang Apr 22, 2026
eef096c
merge buckt config into bucket
shilingwang Apr 22, 2026
24bc0ab
rename type into wire
shilingwang Apr 22, 2026
31d5f30
use axum for typed header
shilingwang Apr 22, 2026
da44896
use ic-certificate instead of agent
shilingwang Apr 22, 2026
e500812
remove unnecessary enum
shilingwang Apr 22, 2026
ae53726
addressing comments
shilingwang Apr 23, 2026
a9b0a7d
add comments for IngressAuthImpl
shilingwang Apr 23, 2026
47011eb
remove intelligent_teering probe but just use CLI
shilingwang Apr 24, 2026
163ce6a
remove &self
shilingwang Apr 24, 2026
c538716
addressing comments
shilingwang Apr 27, 2026
96acd36
make price_component safe
shilingwang Apr 27, 2026
32a87ab
make download in parallel
shilingwang Apr 28, 2026
77d80ea
let the certificate last only 30 minutes
shilingwang Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
968 changes: 868 additions & 100 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ ahash = "0.8.11"
anyhow = "1.0.93"
arc-swap = "1.7.1"
async-trait = "0.1.83"
aws-config = "1.8"
aws-sdk-s3 = "1.120"
aws-smithy-http-client = { version = "1.1", features = ["rustls-ring"] }
aws-smithy-runtime-api = { version = "1.10", features = ["client"] }
axum = { version = "0.8.1", features = ["macros"] }
axum-extra = "0.10.0"
bytes = "1.10.0"
Expand Down Expand Up @@ -100,6 +104,8 @@ url = "2.5.3"
# Read https://github.com/uuid-rs/uuid/releases/tag/1.13.0
uuid = { version = "=1.12.1", features = ["v7"] }
woothee = "0.13.0"
async-stream = "0.3.6"
ic-certification = "3"

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
Expand Down
58 changes: 58 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use ic_bn_lib_common::{
};
use reqwest::Url;

use candid::Principal;

use crate::{
core::{AUTHOR_NAME, SERVICE_NAME},
routing::{RequestType, domain::CanisterAlias},
Expand Down Expand Up @@ -91,6 +93,12 @@ pub struct Cli {
#[command(flatten, next_help_heading = "Cache")]
pub cache: CacheConfig,

#[command(flatten, next_help_heading = "Cashier")]
pub cashier: CashierConfig,

#[command(flatten, next_help_heading = "S3 Storage")]
pub s3: S3Storage,

Comment thread
shilingwang marked this conversation as resolved.
Outdated
#[command(flatten, next_help_heading = "Shedding System")]
pub shed_system: ShedSystemCli,

Expand Down Expand Up @@ -531,6 +539,56 @@ pub struct CacheConfig {
pub cache_xfetch_beta: f64,
}

#[derive(Args)]
pub struct CashierConfig {
/// Canister ID of the cashier backend.
/// When set, the gateway will connect to this canister for billing and budget checks.
#[clap(env, long)]
pub cashier_canister_id: Option<Principal>,

/// How frequently to report accumulated usage counters to the cashier canister
#[clap(env, long, default_value = "10s", value_parser = parse_duration)]
pub cashier_usage_report_interval: Duration,

/// Skip IC egress certificate verification on PUT /blob-tree.
/// Use only for local development / testing.
#[clap(env, long)]
pub fake_ingress_auth: bool,

/// Comma-separated list of hosts allowed to call DELETE /owner.
/// If unset, owner deletion is unrestricted.
#[clap(env = "ALLOW_DELETE_OWNER_FROM_HOST", long)]
pub allow_delete_owner_from_host: Option<String>,
}

#[derive(Args)]
pub struct S3Storage {
/// S3-compatible endpoint URL (e.g. http://localhost:9000 for MinIO).
/// When set together with other S3_* options, enables S3 storage backend.
#[clap(env, long)]
pub s3_endpoint: Option<String>,

/// S3 access key
#[clap(env, long, default_value = "root-user")]
pub s3_access_key: String,

/// S3 secret key
#[clap(env, long, default_value = "password")]
pub s3_secret_key: String,

/// S3 bucket name (used as the default / fallback bucket for health checks)
#[clap(env, long, default_value = "ic-gateway-storage")]
pub s3_bucket: String,

/// S3 region
#[clap(env, long, default_value = "us-east-1")]
pub s3_region: String,

/// S3 session token (for temporary credentials, e.g. Okta-based AWS access)
#[clap(env, long)]
pub s3_session_token: Option<String>,
}

#[derive(Args)]
pub struct Cors {
/// Default value for Access-Control-Allow-Origin header
Expand Down
88 changes: 87 additions & 1 deletion src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use crate::{
cli::Cli,
metrics,
routing::ic::subnets_info::SubnetsInfoFetcher,
routing::storage::{
AWSBucket, BucketLike, CashierClient, CashierConnector, IngressAuth, IngressAuthImpl,
IngressAuthStub, S3Config,
},
routing::{
self,
ic::{
Expand Down Expand Up @@ -254,7 +258,9 @@ pub async fn main(

let root_subnet_id = principal!(MAINNET_ROOT_SUBNET_ID);

let fetcher = Arc::new(SubnetsInfoFetcher::new(Arc::new(agent), root_subnet_id));
let agent = Arc::new(agent);

let fetcher = Arc::new(SubnetsInfoFetcher::new(agent.clone(), root_subnet_id));
let subnets_info = fetcher.info.clone();

health_manager.add(fetcher.clone());
Expand All @@ -264,6 +270,82 @@ pub async fn main(
cli.domain.subnets_info_poll_interval,
);

// Setup Cashier client + billing connector
let cashier_connector = if let Some(canister_id) = cli.cashier.cashier_canister_id {
let cashier_client = Arc::new(CashierClient::new(agent.clone(), canister_id));
warn!("Cashier client configured for canister {canister_id}");

match CashierConnector::new(cashier_client, Some(HOSTNAME.get().unwrap().clone())).await {
Ok(connector) => {
let connector = Arc::new(connector);
warn!("CashierConnector initialized (billing enabled)");

health_manager.add(connector.clone());
tasks.add_interval(
"cashier_usage_reporter",
connector.clone(),
cli.cashier.cashier_usage_report_interval,
);

Some(connector)
}
Err(e) => {
warn!("CashierConnector init failed (non-fatal, billing disabled): {e:#}");
None
}
}
} else {
None
};

// Setup ingress auth for PUT /blob-tree
let ingress_auth: Arc<dyn IngressAuth> = if cli.cashier.fake_ingress_auth {
warn!("Using fake ingress auth (certificate verification disabled)");
Arc::new(IngressAuthStub)
} else {
Arc::new(IngressAuthImpl::new(agent.clone()))
};

// Setup S3 storage backend (single bucket)
let s3_bucket = if let Some(ref endpoint) = cli.s3.s3_endpoint {
let s3_config = S3Config::new(
endpoint.clone(),
cli.s3.s3_access_key.clone(),
cli.s3.s3_secret_key.clone(),
cli.s3.s3_bucket.clone(),
cli.s3.s3_region.clone(),
cli.s3.s3_session_token.clone(),
);
warn!(
endpoint = %endpoint,
bucket = %s3_config.bucket_name,
region = %s3_config.region,
"Initializing S3 storage backend"
);

match AWSBucket::new(s3_config).await {
Ok(bucket) => {
let tiering = if bucket.supports_intelligent_tiering() {
"enabled"
} else {
"disabled"
};
warn!(
bucket = %cli.s3.s3_bucket,
intelligent_tiering = tiering,
"S3 bucket ready"
);
Some(Arc::new(bucket) as Arc<dyn BucketLike>)
}
Err(e) => {
warn!("S3 bucket initialization failed (non-fatal): {e}");
None
}
}
} else {
None
};

// Setup WAF
let waf_layer = if cli.waf.waf_enable {
let v = WafLayer::new_from_cli(&cli.waf, Some(http_client.clone()))
Expand Down Expand Up @@ -292,6 +374,10 @@ pub async fn main(
waf_layer,
custom_domains_router,
subnets_info,
s3_bucket,
cashier_connector,
ingress_auth,
cli.cashier.allow_delete_owner_from_host.clone(),
)
.await
.context("unable to setup Axum router")?;
Expand Down
28 changes: 27 additions & 1 deletion src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod error_cause;
pub mod ic;
pub mod middleware;
pub mod proxy;
pub mod storage;

use std::{net::IpAddr, ops::Deref, str::FromStr, sync::Arc, time::Duration};

Expand Down Expand Up @@ -60,7 +61,10 @@ use crate::{
cli::Cli,
metrics::{self},
routing::ic::subnets_info::SubnetsInfo,
routing::middleware::{canister_match, cors, geoip, headers, preprocess, request_id, validate},
routing::middleware::{
canister_match, cors, geoip, headers, preprocess, request_id, validate,
},
routing::storage::BucketLike,
};
use domain::{CustomDomainStorage, DomainResolver};
use middleware::{
Expand All @@ -79,6 +83,8 @@ use {
};

pub const CONTENT_TYPE_JSON: HeaderValue = hval!("application/json");
pub const CONTENT_TYPE_OCTET: HeaderValue = hval!("application/octet-stream");
pub const ACCEPT_RANGES_BYTES: HeaderValue = hval!("bytes");

#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct CanisterId(pub Principal);
Expand Down Expand Up @@ -214,6 +220,10 @@ pub async fn setup_router(
waf_layer: Option<WafLayer>,
custom_domains_router: Option<Router>,
subnets_info: Arc<ArcSwapOption<SubnetsInfo>>,
s3_bucket: Option<Arc<dyn BucketLike>>,
cashier_connector: Option<Arc<storage::CashierConnector>>,
ingress_auth: Arc<dyn storage::IngressAuth>,
allowed_delete_owner_hosts: Option<String>,
Comment thread
shilingwang marked this conversation as resolved.
Outdated
) -> Result<Router, Error> {
// Setup API router
let router_api = setup_api_router(
Expand Down Expand Up @@ -533,6 +543,18 @@ pub async fn setup_router(
]))
});

// Build optional storage router (blob/chunk CRUD under /v1/)
// Requires both S3 bucket and cashier connector to be configured.
let storage_router = s3_bucket.zip(cashier_connector).map(|(bucket, connector)| {
let state = storage::StorageState {
connector,
bucket,
ingress_auth: ingress_auth.clone(),
allowed_delete_owner_hosts: allowed_delete_owner_hosts.clone(),
};
storage::storage_router(state)
});

// Top-level router
#[allow(unused_mut)]
let mut router = Router::new()
Expand Down Expand Up @@ -578,6 +600,10 @@ pub async fn setup_router(
)
.layer(common_layers);

if let Some(sr) = storage_router {
router = router.nest("/v1", sr);
Comment thread
shilingwang marked this conversation as resolved.
Outdated
}

#[cfg(all(target_os = "linux", feature = "sev-snp"))]
if cli.sev_snp.sev_snp_enable {
let router_sev_snp = Router::new().route(
Expand Down
113 changes: 113 additions & 0 deletions src/routing/storage/auth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use std::sync::Arc;

use candid::Principal;
use ic_bn_lib::ic_agent::{Agent, AgentError, Certificate, hash_tree::LookupResult};
use tracing::warn;

use super::types::{OwnerEgressSignature, PutBlobTreeRequest, StorageGatewayAuthorization};

#[derive(Debug)]
pub enum AuthError {
MissingAuth(String),
Forbidden(String),
}

impl std::fmt::Display for AuthError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::MissingAuth(m) => write!(f, "authentication required: {m}"),
Self::Forbidden(m) => write!(f, "forbidden: {m}"),
}
}
}

pub trait IngressAuth: Send + Sync {
Comment thread
shilingwang marked this conversation as resolved.
fn check_put_blob(&self, request: &PutBlobTreeRequest) -> Result<(), AuthError>;
}

/// Stub that accepts everything. Used with `--fake-ingress-auth`.
#[derive(Default)]
pub struct IngressAuthStub;

impl IngressAuth for IngressAuthStub {
fn check_put_blob(&self, _request: &PutBlobTreeRequest) -> Result<(), AuthError> { Ok(()) }
}

/// Production implementation: verify IC egress certificate.
pub struct IngressAuthImpl {
agent: Arc<Agent>,
Comment thread
shilingwang marked this conversation as resolved.
Outdated
}

impl IngressAuthImpl {
pub fn new(agent: Arc<Agent>) -> Self { Self { agent } }

fn parse_certificate(bytes: &[u8]) -> Result<Certificate, AuthError> {
serde_cbor::from_slice::<Certificate>(bytes).map_err(|e| {
AuthError::Forbidden(format!("failed to parse certificate: {e}"))
})
}

fn verify_certificate(
&self,
cert: &Certificate,
canister: Principal,
) -> Result<(), AuthError> {
match self.agent.verify(cert, canister) {
Ok(()) => Ok(()),
Err(AgentError::CertificateOutdated(_)) => {
warn!("Egress certificate is outdated (stale but valid signature)");
Err(AuthError::Forbidden("certificate outdated".into()))
}
Err(e) => Err(AuthError::Forbidden(format!("certificate verification failed: {e}"))),
}
}

fn extract_payload(cert: &Certificate) -> Result<OwnerEgressSignature, AuthError> {
cert.tree
.list_paths()
.iter()
.find_map(|path| {
if let LookupResult::Found(value) = cert.tree.lookup_path(path) {
candid::decode_one::<OwnerEgressSignature>(value).ok()
} else {
None
}
})
.ok_or_else(|| AuthError::Forbidden("no valid OwnerEgressSignature in certificate tree".into()))
}

fn check_payload(payload: &OwnerEgressSignature, root_hash: &str) -> Result<(), AuthError> {
if payload.method != "upload" {
Err(AuthError::Forbidden(format!("invalid method: {}", payload.method)))
} else if payload.blob_hash != root_hash {
Err(AuthError::Forbidden(format!(
"blob hash mismatch: expected {root_hash}, got {}",
payload.blob_hash
)))
} else {
Ok(())
}
}
}

impl IngressAuth for IngressAuthImpl {
fn check_put_blob(&self, request: &PutBlobTreeRequest) -> Result<(), AuthError> {
let root_hash = request
.blob_tree
.root_hash()
.ok_or_else(|| AuthError::Forbidden("blob tree has no root hash".into()))?
.to_string();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to stringify it here? Can't we check it directly? I guess because OwnerEgressSignature has it as a string?


match &request.auth {
StorageGatewayAuthorization::None => {
Err(AuthError::MissingAuth("no authorization provided".into()))
}
StorageGatewayAuthorization::OwnerEgressSignature(cert_bytes) => {
let cert = Self::parse_certificate(cert_bytes)?;
self.verify_certificate(&cert, request.owner)?;
let payload = Self::extract_payload(&cert)?;
Self::check_payload(&payload, &root_hash)
}
}
}
}
Loading
Loading