Skip to content
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
608d9a6
add s3 connection with billing to ic-gateway
shilingwang Apr 17, 2026
f134ffe
remove the whoami part
shilingwang Apr 17, 2026
9130d35
use cors middleware
shilingwang Apr 17, 2026
0073d2c
move header constant to router
shilingwang Apr 17, 2026
7e4873c
refactor storage/handlers.rs
shilingwang Apr 17, 2026
bb89666
refactor storage to be similar like ic
shilingwang Apr 17, 2026
589733e
refactor to put all storage related code under routing/storage
shilingwang Apr 17, 2026
049e553
address comments
shilingwang Apr 21, 2026
0dfc412
change the new endpoint name with prefix of storage/v1
shilingwang Apr 21, 2026
47ddf5d
remove the fake ingress to prevent production leak
shilingwang Apr 22, 2026
a0a65d3
remove unreachable second BUDGET_REFRESH_DELAY
shilingwang Apr 22, 2026
0206638
use existing dns resolver
shilingwang Apr 22, 2026
eef096c
merge buckt config into bucket
shilingwang Apr 22, 2026
24bc0ab
rename type into wire
shilingwang Apr 22, 2026
31d5f30
use axum for typed header
shilingwang Apr 22, 2026
da44896
use ic-certificate instead of agent
shilingwang Apr 22, 2026
e500812
remove unnecessary enum
shilingwang Apr 22, 2026
ae53726
addressing comments
shilingwang Apr 23, 2026
a9b0a7d
add comments for IngressAuthImpl
shilingwang Apr 23, 2026
47011eb
remove intelligent_teering probe but just use CLI
shilingwang Apr 24, 2026
163ce6a
remove &self
shilingwang Apr 24, 2026
c538716
addressing comments
shilingwang Apr 27, 2026
96acd36
make price_component safe
shilingwang Apr 27, 2026
32a87ab
make download in parallel
shilingwang Apr 28, 2026
77d80ea
let the certificate last only 30 minutes
shilingwang Apr 28, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
994 changes: 894 additions & 100 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ ahash = "0.8.11"
anyhow = "1.0.93"
arc-swap = "1.7.1"
async-trait = "0.1.83"
aws-config = "1.8"
aws-sdk-s3 = "1.120"
aws-smithy-http-client = { version = "1.1", features = ["rustls-ring"] }
aws-smithy-runtime-api = { version = "1.10", features = ["client"] }
axum = { version = "0.8.1", features = ["macros"] }
axum-extra = "0.10.0"
axum-extra = { version = "0.10.0", features = ["typed-header"] }
bytes = "1.10.0"
candid = "0.10.10"
clap = { version = "4.5.20", features = ["derive", "string", "env"] }
Expand Down Expand Up @@ -50,6 +54,7 @@ ic-bn-lib = { version = "0.1.18", features = [
"clients-hyper",
] }
ic-bn-lib-common = "0.1"
ic-certificate-verification = "3.1"
ic-custom-domains-backend = "0.1"
ic-custom-domains-base = "0.1"
ic-http-certification = { version = "3.1.0", optional = true }
Expand Down Expand Up @@ -100,6 +105,8 @@ url = "2.5.3"
# Read https://github.com/uuid-rs/uuid/releases/tag/1.13.0
uuid = { version = "=1.12.1", features = ["v7"] }
woothee = "0.13.0"
async-stream = "0.3.6"
ic-certification = "3"

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async_tokio"] }
Expand Down
6 changes: 1 addition & 5 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,7 @@ pub async fn log_handler(
.into_response();
};
// Maintain hickory_proto::dnssec=error filter when changing log level
let env_filter = EnvFilter::new(format!(
"{},{}",
log_level,
crate::log::LOG_LEVEL_OVERRIDES
));
let env_filter = EnvFilter::new(format!("{},{}", log_level, crate::log::LOG_LEVEL_OVERRIDES));
let _ = state.log_handle.modify(|f| *f = env_filter);

"Ok\n".into_response()
Expand Down
62 changes: 62 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use ic_bn_lib_common::{
};
use reqwest::Url;

use candid::Principal;

use crate::{
core::{AUTHOR_NAME, SERVICE_NAME},
routing::{RequestType, domain::CanisterAlias},
Expand Down Expand Up @@ -91,6 +93,9 @@ pub struct Cli {
#[command(flatten, next_help_heading = "Cache")]
pub cache: CacheConfig,

#[command(flatten)]
pub blob_storage: BlobStorage,

#[command(flatten, next_help_heading = "Shedding System")]
pub shed_system: ShedSystemCli,

Expand Down Expand Up @@ -531,6 +536,63 @@ pub struct CacheConfig {
pub cache_xfetch_beta: f64,
}

/// Blob-storage feature config: cashier billing + S3 backend.
/// Both groups below are flattened at the top level; `cli.blob_storage.cashier.*`
/// and `cli.blob_storage.s3.*` on the Rust side, flag names unchanged.
#[derive(Args)]
pub struct BlobStorage {
#[command(flatten, next_help_heading = "Blob Storage — Cashier")]
pub cashier: CashierConfig,

#[command(flatten, next_help_heading = "Blob Storage — S3")]
pub s3: S3Storage,
}

#[derive(Args)]
pub struct CashierConfig {
/// Canister ID of the cashier backend.
/// When set, the gateway will connect to this canister for billing and budget checks.
#[clap(env, long)]
pub cashier_canister_id: Option<Principal>,

/// How frequently to report accumulated usage counters to the cashier canister
#[clap(env, long, default_value = "10s", value_parser = parse_duration)]
pub cashier_usage_report_interval: Duration,

/// Comma-separated list of hosts allowed to call DELETE /owner.
/// If unset, owner deletion is unrestricted.
#[clap(env = "ALLOW_DELETE_OWNER_FROM_HOST", long)]
pub allow_delete_owner_from_host: Option<String>,
}

#[derive(Args)]
pub struct S3Storage {
/// S3-compatible endpoint URL (e.g. http://localhost:9000 for MinIO).
/// When set together with other S3_* options, enables S3 storage backend.
#[clap(env, long)]
pub s3_endpoint: Option<String>,

/// S3 access key
#[clap(env, long, default_value = "root-user")]
pub s3_access_key: String,

/// S3 secret key
#[clap(env, long, default_value = "password")]
pub s3_secret_key: String,

/// S3 bucket name (used as the default / fallback bucket for health checks)
#[clap(env, long, default_value = "ic-gateway-storage")]
pub s3_bucket: String,

/// S3 region
#[clap(env, long, default_value = "us-east-1")]
pub s3_region: String,

/// S3 session token (for temporary credentials, e.g. Okta-based AWS access)
#[clap(env, long)]
pub s3_session_token: Option<String>,
}

#[derive(Args)]
pub struct Cors {
/// Default value for Access-Control-Allow-Origin header
Expand Down
94 changes: 93 additions & 1 deletion src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ use crate::{
cli::Cli,
metrics,
routing::ic::subnets_info::SubnetsInfoFetcher,
routing::storage::{
AWSBucket, BucketLike, CashierClient, CashierConnector, IngressAuth, IngressAuthImpl,
S3Config, StorageState,
},
routing::{
self,
ic::{
Expand Down Expand Up @@ -254,7 +258,9 @@ pub async fn main(

let root_subnet_id = principal!(MAINNET_ROOT_SUBNET_ID);

let fetcher = Arc::new(SubnetsInfoFetcher::new(Arc::new(agent), root_subnet_id));
let agent = Arc::new(agent);

let fetcher = Arc::new(SubnetsInfoFetcher::new(agent.clone(), root_subnet_id));
let subnets_info = fetcher.info.clone();

health_manager.add(fetcher.clone());
Expand All @@ -264,6 +270,91 @@ pub async fn main(
cli.domain.subnets_info_poll_interval,
);

// Setup Cashier client + billing connector
let cashier_connector = if let Some(canister_id) = cli.blob_storage.cashier.cashier_canister_id
{
let cashier_client = Arc::new(CashierClient::new(agent.clone(), canister_id));
warn!("Cashier client configured for canister {canister_id}");

match CashierConnector::new(cashier_client, Some(HOSTNAME.get().unwrap().clone())).await {
Ok(connector) => {
let connector = Arc::new(connector);
warn!("CashierConnector initialized (billing enabled)");

health_manager.add(connector.clone());
tasks.add_interval(
"cashier_usage_reporter",
connector.clone(),
cli.blob_storage.cashier.cashier_usage_report_interval,
);

Some(connector)
}
Err(e) => {
warn!("CashierConnector init failed (non-fatal, billing disabled): {e:#}");
None
}
}
} else {
None
};

// Setup S3 storage backend (single bucket)
let s3_bucket = if let Some(ref endpoint) = cli.blob_storage.s3.s3_endpoint {
let s3_config = S3Config {
endpoint: endpoint.clone(),
access_key: cli.blob_storage.s3.s3_access_key.clone(),
secret_key: cli.blob_storage.s3.s3_secret_key.clone(),
bucket_name: cli.blob_storage.s3.s3_bucket.clone(),
region: cli.blob_storage.s3.s3_region.clone(),
session_token: cli.blob_storage.s3.s3_session_token.clone(),
};
warn!(
endpoint = %endpoint,
bucket = %s3_config.bucket_name,
region = %s3_config.region,
"Initializing S3 storage backend"
);

match AWSBucket::new(s3_config, Arc::new(dns_resolver.clone())).await {
Ok(bucket) => {
let tiering = if bucket.supports_intelligent_tiering() {
"enabled"
} else {
"disabled"
};
warn!(
bucket = %cli.blob_storage.s3.s3_bucket,
intelligent_tiering = tiering,
"S3 bucket ready"
);
Some(Arc::new(bucket) as Arc<dyn BucketLike>)
}
Err(e) => {
warn!("S3 bucket initialization failed (non-fatal): {e}");
None
}
}
} else {
None
};

// Assemble storage state (enabled iff both S3 bucket and cashier connector
// are configured). Ingress auth is only constructed when storage is active.
let storage_state = s3_bucket.zip(cashier_connector).map(|(bucket, connector)| {
let ingress_auth: Arc<dyn IngressAuth> = Arc::new(IngressAuthImpl::new(agent.clone()));
StorageState {
connector,
bucket,
ingress_auth,
allowed_delete_owner_hosts: cli
.blob_storage
.cashier
.allow_delete_owner_from_host
.clone(),
}
});

// Setup WAF
let waf_layer = if cli.waf.waf_enable {
let v = WafLayer::new_from_cli(&cli.waf, Some(http_client.clone()))
Expand Down Expand Up @@ -292,6 +383,7 @@ pub async fn main(
waf_layer,
custom_domains_router,
subnets_info,
storage_state,
)
.await
.context("unable to setup Axum router")?;
Expand Down
107 changes: 107 additions & 0 deletions src/routing/error_cause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ pub enum BackendError {
HttpGateway(String),
Other(String),
ResponseVerification(String),
// Storage backends
Cashier(String),
S3(String),
}

impl BackendError {
Expand All @@ -120,6 +123,8 @@ impl BackendError {
Self::BoundaryNode(x) => Some(x.clone()),
Self::HttpGateway(x) => Some(x.clone()),
Self::Other(x) => Some(x.clone()),
Self::Cashier(x) => Some(x.clone()),
Self::S3(x) => Some(x.clone()),
_ => None,
}
}
Expand All @@ -137,6 +142,10 @@ pub enum ClientError {
UnknownDomain(FQDN),
DomainCanisterMismatch(Principal),
SubnetNotFound,
// Storage-side client errors
#[strum(to_string = "not_found_{0}")]
NotFound(&'static str),
RangeNotSatisfiable(u64),
}

impl ClientError {
Expand All @@ -148,6 +157,8 @@ impl ClientError {
Self::DomainCanisterMismatch(x) => {
Some(format!("The canister {x} is not served by this domain"))
}
Self::NotFound(x) => Some((*x).into()),
Self::RangeNotSatisfiable(total) => Some(format!("available length: {total}")),
_ => None,
}
}
Expand Down Expand Up @@ -703,6 +714,102 @@ impl ErrorData {
}
}

// ---------------------------------------------------------------------------
// Storage API error type (for /v1/* endpoints)
// ---------------------------------------------------------------------------
//
// Structurally similar to `ErrorCause` but with two differences:
// * reuses `ClientError` and `BackendError` above (instead of defining parallel
// enums); the existing `MalformedRequest`/`BodyTooLarge` variants fit the
// storage `bad_request`/`payload_too_large` semantics, and storage-specific
// concepts (`NotFound`, `RangeNotSatisfiable`, `Cashier`, `S3`) are folded
// into the same enums as new variants.
// * renders responses as plain text (not HTML pages): `/v1/*` is an
// SDK-facing API, not a browser destination.

/// Error type for storage API endpoints (`/v1/*`).
#[derive(Debug, Clone, Display, IntoStaticStr, Eq, PartialEq)]
#[strum(serialize_all = "snake_case")]
pub enum StorageError {
#[strum(to_string = "client_{0}")]
Client(ClientError),
#[strum(to_string = "backend_{0}")]
Backend(BackendError),
/// 401 with `WWW-Authenticate` (missing ingress signature).
Unauthorized(String),
/// 403 with an explanatory message (the enum-level `ErrorCause::Forbidden`
/// carries no message, hence a storage-specific variant).
Forbidden(String),
/// 403 — owner principal unknown to the cashier.
OwnerNotFound,
/// 403 — owner out of credit.
InsufficientBalance,
#[strum(serialize = "internal_server_error")]
Internal(String),
}

impl From<ClientError> for StorageError {
fn from(e: ClientError) -> Self {
Self::Client(e)
}
}

impl From<BackendError> for StorageError {
fn from(e: BackendError) -> Self {
Self::Backend(e)
}
}

impl IntoResponse for StorageError {
fn into_response(self) -> Response {
use http::header;
let mut headers = http::HeaderMap::new();
let (status, body) = match &self {
Self::Client(ClientError::MalformedRequest(m)) => (StatusCode::BAD_REQUEST, m.clone()),
Self::Client(ClientError::BodyTooLarge) => {
(StatusCode::PAYLOAD_TOO_LARGE, "payload too large".into())
}
Self::Client(ClientError::NotFound(what)) => {
(StatusCode::NOT_FOUND, format!("{what} not found"))
}
Self::Client(ClientError::RangeNotSatisfiable(total)) => (
StatusCode::RANGE_NOT_SATISFIABLE,
format!("range not satisfiable; available length: {total}"),
),
Self::Client(e) => (
StatusCode::BAD_REQUEST,
e.details().unwrap_or_else(|| e.to_string()),
),
Self::Backend(BackendError::Cashier(m)) => {
(StatusCode::BAD_GATEWAY, format!("cashier unavailable: {m}"))
}
Self::Backend(BackendError::S3(m)) => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("storage backend error: {m}"),
),
Self::Backend(e) => (
StatusCode::BAD_GATEWAY,
e.details().unwrap_or_else(|| e.to_string()),
),
Self::Unauthorized(m) => {
headers.insert(
header::WWW_AUTHENTICATE,
ic_bn_lib::hval!("X-ICP-Canister-Signature"),
);
(StatusCode::UNAUTHORIZED, m.clone())
}
Self::Forbidden(m) => (StatusCode::FORBIDDEN, m.clone()),
Self::OwnerNotFound => (StatusCode::FORBIDDEN, "owner not found".into()),
Self::InsufficientBalance => (StatusCode::FORBIDDEN, "insufficient balance".into()),
Self::Internal(m) => (StatusCode::INTERNAL_SERVER_ERROR, m.clone()),
};
let mut resp = (status, headers, body).into_response();
// Same pattern as ErrorCause: make the typed error visible to middleware.
resp.extensions_mut().insert(self);
resp
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading
Loading