diff --git a/.gitignore b/.gitignore index 980c54b3..78e17712 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ artifacts/ dev/pki/* !dev/pki/pki -**/mitmproxy/ \ No newline at end of file +**/mitmproxy/ +dev/logs/ diff --git a/broker/src/config.rs b/broker/src/config.rs index 35fed112..e5a7156e 100644 --- a/broker/src/config.rs +++ b/broker/src/config.rs @@ -5,7 +5,7 @@ use crate::{ }; use axum::http::Uri; use clap::Parser; -use shared::{openssl::x509::X509, reqwest::{self, Url}}; +use shared::{logger::LogOptions, openssl::x509::X509, reqwest::{self, Url}}; use std::str::FromStr; use tracing::info; @@ -16,7 +16,7 @@ use tracing::info; arg_required_else_help(true), after_help(crate::CLAP_FOOTER) )] -struct CliArgs { +pub struct CliArgs { /// Local bind address #[clap(long, env, value_parser, default_value_t = SocketAddr::from_str("0.0.0.0:8080").unwrap())] bind_addr: SocketAddr, @@ -52,6 +52,9 @@ struct CliArgs { /// The API key for accessing monitoring endpoints of the broker #[clap(long, env, value_parser)] monitoring_api_key: Option, + + #[clap(flatten)] + pub log_options: LogOptions, } #[derive(Debug)] @@ -66,8 +69,7 @@ pub struct Config { } impl Config { - pub fn load() -> Result { - let cli_args = CliArgs::parse(); + pub fn load(cli_args: CliArgs) -> Result { beam_lib::set_broker_id(cli_args.broker_url.host().unwrap().to_string()); let pki_token = read_to_string(&cli_args.pki_apikey_file) .map_err(|e| { diff --git a/broker/src/main.rs b/broker/src/main.rs index 301d8fe6..afda99ed 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -14,6 +14,7 @@ mod compare_client_server_version; use std::{collections::HashMap, sync::Arc, time::Duration}; +use clap::Parser; use crypto::GetCertsFromPki; use serve_health::{Health, InitStatus}; use once_cell::sync::Lazy; @@ -21,13 +22,14 @@ use shared::{errors::SamplyBeamError, openssl::x509::X509, *}; use tokio::sync::RwLock; use tracing::{error, info, warn}; -use crate::serve::BrokerState; +use crate::{config::CliArgs, serve::BrokerState}; #[tokio::main] pub async fn main() -> anyhow::Result<()> { - shared::logger::init_logger()?; + let args = CliArgs::parse(); + let _log_guard = shared::logger::init_logger(&args.log_options)?; banner::print_banner(); - let config = config::Config::load()?; + let config = config::Config::load(args)?; let health = Arc::new(RwLock::new(Health::default())); let cert_getter = GetCertsFromPki::new(health.clone(), &config)?; diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index 9c2442ce..381755e5 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -4,9 +4,9 @@ use axum::{extract::{Path, Request, State}, http::{header, request::Parts, Heade use bytes::BufMut; use hyper_util::rt::TokioIo; use serde::{Serialize, Serializer, ser::SerializeSeq}; -use shared::{crypto_jwt::Authorized, expire_map::LazyExpireMap, serde_helpers::DerefSerializer, Encrypted, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgSocketRequest}; +use shared::{format_to_without_broker, crypto_jwt::Authorized, expire_map::LazyExpireMap, serde_helpers::DerefSerializer, Encrypted, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgSocketRequest}; use tokio::sync::{RwLock, broadcast::{Sender, self}, oneshot}; -use tracing::{debug, log::error, warn}; +use tracing::{debug, info, log::error, warn}; use crate::task_manager::{TaskManager, Task}; @@ -69,6 +69,7 @@ async fn post_socket_request( state: State, msg: MsgSigned>, ) -> Result { + info!(from = %msg.get_from().hide_broker(), to = %format_to_without_broker(&msg.get_to()), id = %msg.msg.id, "Submitted socket request"); let msg_id = msg.wait_id(); state.task_manager.post_task(msg)?; @@ -90,6 +91,7 @@ async fn connect_socket( Ok(msg) => msg.msg, Err(e) => return Ok(e.into_response()), }; + info!(from = %msg.get_from().hide_broker(), id = %task_id, "Connected to socket request"); { let task = state.task_manager.get(&task_id)?; // Allowed to connect are the issuer of the task and the recipient diff --git a/broker/src/serve_tasks.rs b/broker/src/serve_tasks.rs index 9c6a27d8..999c1438 100644 --- a/broker/src/serve_tasks.rs +++ b/broker/src/serve_tasks.rs @@ -16,9 +16,7 @@ use futures_core::{stream, Stream}; use serde::Deserialize; use beam_lib::WorkStatus; use shared::{ - errors::SamplyBeamError, sse_event::SseEventType, - EncryptedMsgTaskRequest, EncryptedMsgTaskResult, HasWaitId, HowLongToBlock, Msg, MsgEmpty, - MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, EMPTY_VEC_APPORPROXYID, serde_helpers::DerefSerializer, + EMPTY_VEC_APPORPROXYID, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, errors::SamplyBeamError, format_to_without_broker, serde_helpers::DerefSerializer, sse_event::SseEventType }; use tokio::{ sync::{ @@ -339,12 +337,10 @@ async fn post_task( State(state): State, msg: MsgSigned, ) -> Result<(StatusCode, impl IntoResponse), StatusCode> { - // let id = MsgId::new(); - // msg.id = id; - // TODO: Check if ID is taken + info!(id = %msg.msg.id, from = %msg.msg.from.hide_broker(), to = %format_to_without_broker(&msg.msg.to), "New task"); trace!( - "Client {} with IP {addr} is creating task {:?}", - msg.msg.from, msg + "Client {} with IP {addr} is creating task {msg:?}", + msg.msg.from, ); let id = msg.msg.id; state.task_manager.post_task(msg)?; @@ -361,6 +357,7 @@ async fn put_result( State(state): State, result: MsgSigned, ) -> Result { + info!(r#for = %result.msg.task, from = %result.msg.from.hide_broker(), status = ?result.msg.status, "New result"); trace!("Called: Task {:?}, {:?} by {addr}", task_id, result); if task_id != result.msg.task { return Err(( @@ -376,7 +373,6 @@ async fn put_result( )); } - let status = if state.task_manager.put_result(&task_id, result)? { StatusCode::NO_CONTENT } else { diff --git a/dev/beamdev b/dev/beamdev index 4beb95d5..a100f3ef 100755 --- a/dev/beamdev +++ b/dev/beamdev @@ -169,6 +169,7 @@ function stop { function clean { docker compose --profile "*" down rm -fv pki/*.pem pki/*.json pki/pki.secret + sudo rm -fv logs/*/*.log pki/pki clean } diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 985c6f7c..fa16c583 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -31,7 +31,10 @@ services: BIND_ADDR: 0.0.0.0:8080 MONITORING_API_KEY: ${BROKER_MONITORING_KEY} RUST_LOG: ${RUST_LOG} + LOG_DIR: /logs # ALL_PROXY: http://mitmproxy:8080 + volumes: + - ./logs/broker:/logs secrets: - pki.secret - dummy.pem @@ -60,7 +63,10 @@ services: PRIVKEY_FILE: /run/secrets/proxy1.pem BIND_ADDR: 0.0.0.0:8081 RUST_LOG: ${RUST_LOG} + LOG_DIR: /logs # ALL_PROXY: http://mitmproxy:8080 + volumes: + - ./logs/proxy1:/logs secrets: - proxy1.pem - root.crt.pem @@ -82,7 +88,10 @@ services: PRIVKEY_FILE: /run/secrets/proxy2.pem BIND_ADDR: 0.0.0.0:8082 RUST_LOG: ${RUST_LOG} + LOG_DIR: /logs # ALL_PROXY: http://mitmproxy:8080 + volumes: + - ./logs/proxy2:/logs secrets: - proxy2.pem - root.crt.pem diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 20ccb3be..5baee9ae 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -2,7 +2,7 @@ use clap::Parser; use regex::Regex; use reqwest::Url; use rsa::{pkcs1::DecodeRsaPrivateKey, pkcs8::DecodePrivateKey, RsaPrivateKey}; -use shared::{errors::SamplyBeamError, jwt_simple::prelude::RS256KeyPair, openssl::x509::X509, reqwest}; +use shared::{errors::SamplyBeamError, jwt_simple::prelude::RS256KeyPair, logger::LogOptions, openssl::x509::X509, reqwest}; use std::{ collections::HashMap, @@ -70,6 +70,9 @@ pub struct CliArgs { /// samply.pki: Path to CA Root certificate #[clap(long, env, value_parser, default_value = "/run/secrets/root.crt.pem")] rootcert_file: PathBuf, + + #[clap(flatten)] + pub log_options: LogOptions, } pub const APP_PREFIX: &str = "APP"; @@ -101,8 +104,7 @@ fn parse_apikeys(proxy_id: &ProxyId) -> Result, SamplyBea } impl Config { - pub fn load() -> Result { - let cli_args = CliArgs::parse(); + pub fn load(cli_args: CliArgs) -> Result { beam_lib::set_broker_id(cli_args.broker_url.host().unwrap().to_string()); let proxy_id = ProxyId::new(&cli_args.proxy_id).map_err(|e| { SamplyBeamError::ConfigurationFailed(format!( diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 73401b51..297e6230 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -5,6 +5,7 @@ use std::time::Duration; use axum::http::{header, HeaderValue, StatusCode}; use beam_lib::AppOrProxyId; +use clap::Parser; use futures::future::Ready; use futures::{StreamExt, TryStreamExt}; use shared::{reqwest, EncryptedMessage, MsgEmpty, PlainMessage}; @@ -15,7 +16,7 @@ use tokio::time::Instant; use tracing::{debug, error, info, warn}; use tryhard::{backoff_strategies::ExponentialBackoff, RetryFuture, RetryFutureConfig}; -use crate::config::Config; +use crate::config::{CliArgs, Config}; use crate::serve_tasks::sign_request; mod auth; @@ -32,10 +33,11 @@ pub(crate) const PROXY_TIMEOUT: u64 = 120; #[tokio::main] pub async fn main() -> anyhow::Result<()> { - shared::logger::init_logger()?; + let args = CliArgs::parse(); + let _log_guard = shared::logger::init_logger(&args.log_options)?; banner::print_banner(); - let config = Config::load()?; + let config = Config::load(args)?; let retry_policy = reqwest::retry::for_host(config.broker_uri.host_str().unwrap().to_string()) .classify_fn(|res| { if res.method() != reqwest::Method::GET { @@ -90,11 +92,11 @@ pub async fn main() -> anyhow::Result<()> { } fn retry_notify(f: F, on_error: Cb) -> RetryFuture, &E) -> Ready<()>>> -where +where F: FnMut() -> Fut, Fut: Future>, Cb: Fn(&E, Duration) + 'static, - + { tryhard::retry_fn(f) .retries(100) diff --git a/proxy/src/serve_tasks.rs b/proxy/src/serve_tasks.rs index d14a4158..ab8f68a7 100644 --- a/proxy/src/serve_tasks.rs +++ b/proxy/src/serve_tasks.rs @@ -17,9 +17,9 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; use beam_lib::{AppId, AppOrProxyId, ProxyId}; use shared::{ - crypto::{self, CryptoPublicPortion}, crypto_jwt, errors::SamplyBeamError, http_client::SamplyHttpClient, reqwest, sse_event::SseEventType, DecryptableMsg, EncryptableMsg, EncryptedMessage, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, MessageType, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, PlainMessage + DecryptableMsg, EncryptableMsg, EncryptedMessage, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, MessageType, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, PlainMessage, crypto::{self, CryptoPublicPortion}, crypto_jwt, errors::SamplyBeamError, format_to_without_broker, http_client::SamplyHttpClient, reqwest, sse_event::SseEventType }; -use tokio::io::BufReader; +use tokio::{io::BufReader, task::id}; use tracing::{debug, error, info, trace, warn}; use crate::{auth::AuthenticatedApp, config::Config, PROXY_TIMEOUT}; @@ -82,6 +82,13 @@ pub(crate) async fn forward_request( HeaderValue::from_static(env!("SAMPLY_USER_AGENT")), ); let (encrypted_msg, parts) = encrypt_request(req, &sender).await?; + match &encrypted_msg { + MessageType::MsgTaskRequest(task) => info!(from = %sender.hide_broker_name(), to = %format_to_without_broker(&task.to), id = %task.id, "Sending task"), + MessageType::MsgTaskResult(result) => info!(from = %sender.hide_broker_name(), r#for = %result.task, "Submitting result"), + #[cfg(feature = "sockets")] + MessageType::MsgSocketRequest(socket_req) => info!(from = %socket_req.get_from().hide_broker(), to = %format_to_without_broker(&socket_req.get_to()), id = %socket_req.id, "Submitting socket request"), + MessageType::MsgEmpty(..) => {}, + }; let req = sign_request(encrypted_msg, parts, &config).await.map_err(IntoResponse::into_response)?; trace!("Requesting: {:?}", req); let resp = client.execute(req).await.map_err(|e| { @@ -393,6 +400,13 @@ pub(crate) async fn validate_and_decrypt(json: Value, config: &Config) -> Result let msg = MsgSigned::::verify(&signed.jwt) .await? .msg; + match &msg { + MessageType::MsgTaskRequest(task) => info!(from = %task.get_from().hide_broker(), id = %task.id, "New task"), + MessageType::MsgTaskResult(result) => info!(from = %result.get_from().hide_broker(), r#for = %result.task, "New result"), + #[cfg(feature = "sockets")] + MessageType::MsgSocketRequest(socket_req) => info!(from = %socket_req.get_from().hide_broker(), id = %socket_req.id, "New socket request"), + MessageType::MsgEmpty(..) => {}, + }; Ok(serde_json::to_value(decrypt_msg(msg, config)?).expect("Should serialize fine")) } Err(e) => Err(SamplyBeamError::JsonParseError(format!( diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 4c2cb596..a59fd95a 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -15,6 +15,7 @@ uuid = { version = "1", features = [ ]} serde = { version = "1", features = ["derive"] } serde_json = "1" +clap.workspace = true tokio = { version = "1", features = ["full"] } axum = { version = "0.8", features = [] } @@ -25,7 +26,7 @@ reqwest = { version = "0.12", features = ["stream"] } # Logging tracing = "0.1" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } # Crypto rand = "0.8" @@ -50,6 +51,7 @@ dashmap = { version = "6.0", optional = true} beam-lib = { workspace = true } async-trait = "0.1" +tracing-appender = "0.2.3" [features] expire_map = ["dep:dashmap"] diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 2ce85383..25f692ad 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -716,6 +716,20 @@ impl Msg for MsgPing { } } +pub fn format_to_without_broker(to: &[AppOrProxyId]) -> impl Display + use<'_> { + struct Helper<'a> { + to: &'a [AppOrProxyId], + } + + impl<'a> std::fmt::Display for Helper<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.to.iter().map(|to| to.hide_broker())).finish() + } + } + + Helper { to } +} + pub fn try_read(map: &HashMap, key: &str) -> Option where T: FromStr, diff --git a/shared/src/logger.rs b/shared/src/logger.rs index ec983d7a..435a3935 100644 --- a/shared/src/logger.rs +++ b/shared/src/logger.rs @@ -1,36 +1,44 @@ +use std::{io, path::PathBuf}; + use tracing::{debug, dispatcher::SetGlobalDefaultError, Level}; -use tracing_subscriber::fmt::format::{debug_fn, self}; +use tracing_appender::{non_blocking::WorkerGuard, rolling::{InitError, Rotation}}; +use tracing_subscriber::{fmt::{self, format::{self, debug_fn}}, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + +#[derive(Debug, clap::Args)] +pub struct LogOptions { + #[clap(long, env)] + /// Directory to store log files in. + pub log_dir: Option, -#[allow(clippy::if_same_then_else)] // The redundant if-else serves documentation purposes -pub fn init_logger() -> Result<(), SetGlobalDefaultError> { - let subscriber = tracing_subscriber::FmtSubscriber::builder() - .fmt_fields(debug_fn(|w, f, v| match f.name() { - "from" | "message" => write!(w, "{v:?}"), - _ => write!(w, "{f}={v:?} "), - })) - .with_max_level(Level::DEBUG); + /// Number of days to retain log files. + #[clap(long, env, default_value_t = 30)] + pub log_retention: usize, +} - // TODO: Reduce code complexity. - let env_filter = match std::env::var("RUST_LOG") { - Ok(env) if !env.is_empty() => { - if env.contains("hyper=") { - env - } else { - format!("{env},hyper=info") - } - } - _ => { - if cfg!(debug_assertions) { - "info,hyper=info".to_string() - } else { - "info,hyper=info".to_string() - } - } +pub fn init_logger(log_opts: &LogOptions) -> Result, InitError> { + let env_filter = EnvFilter::builder() + .with_default_directive(Level::INFO.into()) + .from_env_lossy(); + let registry = tracing_subscriber::registry() + .with(env_filter); + let (registry, guard) = if let LogOptions { log_dir: Some(log_dir), log_retention } = log_opts { + let appender = tracing_appender::rolling::Builder::new() + .max_log_files(*log_retention) + .filename_suffix("log") + .rotation(Rotation::DAILY) + .build(log_dir)?; + let (appender, guard) = tracing_appender::non_blocking(appender); + let json_layer = fmt::layer() + .json() + .with_writer(appender); + (registry.with(Some(json_layer)), Some(guard)) + } else { + (registry.with(None), None) }; - let subscriber = subscriber.with_env_filter(env_filter.clone()).finish(); - tracing::subscriber::set_global_default(subscriber)?; + registry + .with(fmt::layer()) + .init(); - debug!("Logging initialized with env_filter {env_filter}."); - Ok(()) + Ok(guard) } diff --git a/shared/src/middleware.rs b/shared/src/middleware.rs index 70250118..79ad249c 100644 --- a/shared/src/middleware.rs +++ b/shared/src/middleware.rs @@ -12,7 +12,7 @@ pub async fn log( ) -> Response { let method = req.method().clone(); let uri = req.uri().clone(); - let span = info_span!("", from = field::Empty); + let span = info_span!("request", from = field::Empty); async move { let resp = next.run(req).instrument(Span::current()).await;