Skip to content
Merged

Logs #251

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
artifacts/
dev/pki/*
!dev/pki/pki
**/mitmproxy/
**/mitmproxy/
dev/logs/
10 changes: 6 additions & 4 deletions broker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use axum::http::Uri;
use clap::Parser;
use shared::{openssl::x509::X509, reqwest::{self, Url}};
use shared::{logger::LogOptions, openssl::x509::X509, reqwest::{self, Url}};
use std::str::FromStr;
use tracing::info;

Expand All @@ -16,7 +16,7 @@ use tracing::info;
arg_required_else_help(true),
after_help(crate::CLAP_FOOTER)
)]
struct CliArgs {
pub struct CliArgs {
/// Local bind address
#[clap(long, env, value_parser, default_value_t = SocketAddr::from_str("0.0.0.0:8080").unwrap())]
bind_addr: SocketAddr,
Expand Down Expand Up @@ -52,6 +52,9 @@ struct CliArgs {
/// The API key for accessing monitoring endpoints of the broker
#[clap(long, env, value_parser)]
monitoring_api_key: Option<String>,

#[clap(flatten)]
pub log_options: LogOptions,
}

#[derive(Debug)]
Expand All @@ -66,8 +69,7 @@ pub struct Config {
}

impl Config {
pub fn load() -> Result<Self, SamplyBeamError> {
let cli_args = CliArgs::parse();
pub fn load(cli_args: CliArgs) -> Result<Self, SamplyBeamError> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you pull this into main?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the cli args now include the log config which I did not bother to include the actual config.

beam_lib::set_broker_id(cli_args.broker_url.host().unwrap().to_string());
let pki_token = read_to_string(&cli_args.pki_apikey_file)
.map_err(|e| {
Expand Down
8 changes: 5 additions & 3 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ mod compare_client_server_version;

use std::{collections::HashMap, sync::Arc, time::Duration};

use clap::Parser;
use crypto::GetCertsFromPki;
use serve_health::{Health, InitStatus};
use once_cell::sync::Lazy;
use shared::{errors::SamplyBeamError, openssl::x509::X509, *};
use tokio::sync::RwLock;
use tracing::{error, info, warn};

use crate::serve::BrokerState;
use crate::{config::CliArgs, serve::BrokerState};

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
shared::logger::init_logger()?;
let args = CliArgs::parse();
let _log_guard = shared::logger::init_logger(&args.log_options)?;
banner::print_banner();
let config = config::Config::load()?;
let config = config::Config::load(args)?;

let health = Arc::new(RwLock::new(Health::default()));
let cert_getter = GetCertsFromPki::new(health.clone(), &config)?;
Expand Down
6 changes: 4 additions & 2 deletions broker/src/serve_sockets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use axum::{extract::{Path, Request, State}, http::{header, request::Parts, Heade
use bytes::BufMut;
use hyper_util::rt::TokioIo;
use serde::{Serialize, Serializer, ser::SerializeSeq};
use shared::{crypto_jwt::Authorized, expire_map::LazyExpireMap, serde_helpers::DerefSerializer, Encrypted, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgSocketRequest};
use shared::{format_to_without_broker, crypto_jwt::Authorized, expire_map::LazyExpireMap, serde_helpers::DerefSerializer, Encrypted, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgSocketRequest};
use tokio::sync::{RwLock, broadcast::{Sender, self}, oneshot};
use tracing::{debug, log::error, warn};
use tracing::{debug, info, log::error, warn};

use crate::task_manager::{TaskManager, Task};

Expand Down Expand Up @@ -69,6 +69,7 @@ async fn post_socket_request(
state: State<SocketState>,
msg: MsgSigned<MsgSocketRequest<Encrypted>>,
) -> Result<impl IntoResponse, StatusCode> {
info!(from = %msg.get_from().hide_broker(), to = %format_to_without_broker(&msg.get_to()), id = %msg.msg.id, "Submitted socket request");
let msg_id = msg.wait_id();
state.task_manager.post_task(msg)?;

Expand All @@ -90,6 +91,7 @@ async fn connect_socket(
Ok(msg) => msg.msg,
Err(e) => return Ok(e.into_response()),
};
info!(from = %msg.get_from().hide_broker(), id = %task_id, "Connected to socket request");
{
let task = state.task_manager.get(&task_id)?;
// Allowed to connect are the issuer of the task and the recipient
Expand Down
14 changes: 5 additions & 9 deletions broker/src/serve_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use futures_core::{stream, Stream};
use serde::Deserialize;
use beam_lib::WorkStatus;
use shared::{
errors::SamplyBeamError, sse_event::SseEventType,
EncryptedMsgTaskRequest, EncryptedMsgTaskResult, HasWaitId, HowLongToBlock, Msg, MsgEmpty,
MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, EMPTY_VEC_APPORPROXYID, serde_helpers::DerefSerializer,
EMPTY_VEC_APPORPROXYID, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, HasWaitId, HowLongToBlock, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, errors::SamplyBeamError, format_to_without_broker, serde_helpers::DerefSerializer, sse_event::SseEventType
};
use tokio::{
sync::{
Expand Down Expand Up @@ -339,12 +337,10 @@ async fn post_task(
State(state): State<TasksState>,
msg: MsgSigned<EncryptedMsgTaskRequest>,
) -> Result<(StatusCode, impl IntoResponse), StatusCode> {
// let id = MsgId::new();
// msg.id = id;
// TODO: Check if ID is taken
info!(id = %msg.msg.id, from = %msg.msg.from.hide_broker(), to = %format_to_without_broker(&msg.msg.to), "New task");
trace!(
"Client {} with IP {addr} is creating task {:?}",
msg.msg.from, msg
"Client {} with IP {addr} is creating task {msg:?}",
msg.msg.from,
);
let id = msg.msg.id;
state.task_manager.post_task(msg)?;
Expand All @@ -361,6 +357,7 @@ async fn put_result(
State(state): State<TasksState>,
result: MsgSigned<EncryptedMsgTaskResult>,
) -> Result<StatusCode, (StatusCode, &'static str)> {
info!(r#for = %result.msg.task, from = %result.msg.from.hide_broker(), status = ?result.msg.status, "New result");
trace!("Called: Task {:?}, {:?} by {addr}", task_id, result);
if task_id != result.msg.task {
return Err((
Expand All @@ -376,7 +373,6 @@ async fn put_result(
));
}


let status = if state.task_manager.put_result(&task_id, result)? {
StatusCode::NO_CONTENT
} else {
Expand Down
1 change: 1 addition & 0 deletions dev/beamdev
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ function stop {
function clean {
docker compose --profile "*" down
rm -fv pki/*.pem pki/*.json pki/pki.secret
sudo rm -fv logs/*/*.log
pki/pki clean
}

Expand Down
9 changes: 9 additions & 0 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ services:
BIND_ADDR: 0.0.0.0:8080
MONITORING_API_KEY: ${BROKER_MONITORING_KEY}
RUST_LOG: ${RUST_LOG}
LOG_DIR: /logs
# ALL_PROXY: http://mitmproxy:8080
volumes:
- ./logs/broker:/logs
secrets:
- pki.secret
- dummy.pem
Expand Down Expand Up @@ -60,7 +63,10 @@ services:
PRIVKEY_FILE: /run/secrets/proxy1.pem
BIND_ADDR: 0.0.0.0:8081
RUST_LOG: ${RUST_LOG}
LOG_DIR: /logs
# ALL_PROXY: http://mitmproxy:8080
volumes:
- ./logs/proxy1:/logs
secrets:
- proxy1.pem
- root.crt.pem
Expand All @@ -82,7 +88,10 @@ services:
PRIVKEY_FILE: /run/secrets/proxy2.pem
BIND_ADDR: 0.0.0.0:8082
RUST_LOG: ${RUST_LOG}
LOG_DIR: /logs
# ALL_PROXY: http://mitmproxy:8080
volumes:
- ./logs/proxy2:/logs
secrets:
- proxy2.pem
- root.crt.pem
Expand Down
8 changes: 5 additions & 3 deletions proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use regex::Regex;
use reqwest::Url;
use rsa::{pkcs1::DecodeRsaPrivateKey, pkcs8::DecodePrivateKey, RsaPrivateKey};
use shared::{errors::SamplyBeamError, jwt_simple::prelude::RS256KeyPair, openssl::x509::X509, reqwest};
use shared::{errors::SamplyBeamError, jwt_simple::prelude::RS256KeyPair, logger::LogOptions, openssl::x509::X509, reqwest};

use std::{
collections::HashMap,
Expand Down Expand Up @@ -70,6 +70,9 @@ pub struct CliArgs {
/// samply.pki: Path to CA Root certificate
#[clap(long, env, value_parser, default_value = "/run/secrets/root.crt.pem")]
rootcert_file: PathBuf,

#[clap(flatten)]
pub log_options: LogOptions,
}

pub const APP_PREFIX: &str = "APP";
Expand Down Expand Up @@ -101,8 +104,7 @@ fn parse_apikeys(proxy_id: &ProxyId) -> Result<HashMap<AppId, ApiKey>, SamplyBea
}

impl Config {
pub fn load() -> Result<Config, SamplyBeamError> {
let cli_args = CliArgs::parse();
pub fn load(cli_args: CliArgs) -> Result<Config, SamplyBeamError> {
beam_lib::set_broker_id(cli_args.broker_url.host().unwrap().to_string());
let proxy_id = ProxyId::new(&cli_args.proxy_id).map_err(|e| {
SamplyBeamError::ConfigurationFailed(format!(
Expand Down
12 changes: 7 additions & 5 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use axum::http::{header, HeaderValue, StatusCode};
use beam_lib::AppOrProxyId;
use clap::Parser;
use futures::future::Ready;
use futures::{StreamExt, TryStreamExt};
use shared::{reqwest, EncryptedMessage, MsgEmpty, PlainMessage};
Expand All @@ -15,7 +16,7 @@ use tokio::time::Instant;
use tracing::{debug, error, info, warn};
use tryhard::{backoff_strategies::ExponentialBackoff, RetryFuture, RetryFutureConfig};

use crate::config::Config;
use crate::config::{CliArgs, Config};
use crate::serve_tasks::sign_request;

mod auth;
Expand All @@ -32,10 +33,11 @@ pub(crate) const PROXY_TIMEOUT: u64 = 120;

#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
shared::logger::init_logger()?;
let args = CliArgs::parse();
let _log_guard = shared::logger::init_logger(&args.log_options)?;
banner::print_banner();

let config = Config::load()?;
let config = Config::load(args)?;
let retry_policy = reqwest::retry::for_host(config.broker_uri.host_str().unwrap().to_string())
.classify_fn(|res| {
if res.method() != reqwest::Method::GET {
Expand Down Expand Up @@ -90,11 +92,11 @@ pub async fn main() -> anyhow::Result<()> {
}

fn retry_notify<F, T, Fut, E, Cb>(f: F, on_error: Cb) -> RetryFuture<F, Fut, ExponentialBackoff, Box<dyn Fn(u32, Option<Duration>, &E) -> Ready<()>>>
where
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
Cb: Fn(&E, Duration) + 'static,

{
tryhard::retry_fn(f)
.retries(100)
Expand Down
18 changes: 16 additions & 2 deletions proxy/src/serve_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use beam_lib::{AppId, AppOrProxyId, ProxyId};
use shared::{
crypto::{self, CryptoPublicPortion}, crypto_jwt, errors::SamplyBeamError, http_client::SamplyHttpClient, reqwest, sse_event::SseEventType, DecryptableMsg, EncryptableMsg, EncryptedMessage, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, MessageType, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, PlainMessage
DecryptableMsg, EncryptableMsg, EncryptedMessage, EncryptedMsgTaskRequest, EncryptedMsgTaskResult, MessageType, Msg, MsgEmpty, MsgId, MsgSigned, MsgTaskRequest, MsgTaskResult, PlainMessage, crypto::{self, CryptoPublicPortion}, crypto_jwt, errors::SamplyBeamError, format_to_without_broker, http_client::SamplyHttpClient, reqwest, sse_event::SseEventType
};
use tokio::io::BufReader;
use tokio::{io::BufReader, task::id};
use tracing::{debug, error, info, trace, warn};

use crate::{auth::AuthenticatedApp, config::Config, PROXY_TIMEOUT};
Expand Down Expand Up @@ -82,6 +82,13 @@ pub(crate) async fn forward_request(
HeaderValue::from_static(env!("SAMPLY_USER_AGENT")),
);
let (encrypted_msg, parts) = encrypt_request(req, &sender).await?;
match &encrypted_msg {
MessageType::MsgTaskRequest(task) => info!(from = %sender.hide_broker_name(), to = %format_to_without_broker(&task.to), id = %task.id, "Sending task"),
MessageType::MsgTaskResult(result) => info!(from = %sender.hide_broker_name(), r#for = %result.task, "Submitting result"),
#[cfg(feature = "sockets")]
MessageType::MsgSocketRequest(socket_req) => info!(from = %socket_req.get_from().hide_broker(), to = %format_to_without_broker(&socket_req.get_to()), id = %socket_req.id, "Submitting socket request"),
MessageType::MsgEmpty(..) => {},
};
let req = sign_request(encrypted_msg, parts, &config).await.map_err(IntoResponse::into_response)?;
trace!("Requesting: {:?}", req);
let resp = client.execute(req).await.map_err(|e| {
Expand Down Expand Up @@ -393,6 +400,13 @@ pub(crate) async fn validate_and_decrypt(json: Value, config: &Config) -> Result
let msg = MsgSigned::<EncryptedMessage>::verify(&signed.jwt)
.await?
.msg;
match &msg {
MessageType::MsgTaskRequest(task) => info!(from = %task.get_from().hide_broker(), id = %task.id, "New task"),
MessageType::MsgTaskResult(result) => info!(from = %result.get_from().hide_broker(), r#for = %result.task, "New result"),
#[cfg(feature = "sockets")]
MessageType::MsgSocketRequest(socket_req) => info!(from = %socket_req.get_from().hide_broker(), id = %socket_req.id, "New socket request"),
MessageType::MsgEmpty(..) => {},
};
Ok(serde_json::to_value(decrypt_msg(msg, config)?).expect("Should serialize fine"))
}
Err(e) => Err(SamplyBeamError::JsonParseError(format!(
Expand Down
4 changes: 3 additions & 1 deletion shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ uuid = { version = "1", features = [
]}
serde = { version = "1", features = ["derive"] }
serde_json = "1"
clap.workspace = true

tokio = { version = "1", features = ["full"] }
axum = { version = "0.8", features = [] }
Expand All @@ -25,7 +26,7 @@ reqwest = { version = "0.12", features = ["stream"] }

# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }

# Crypto
rand = "0.8"
Expand All @@ -50,6 +51,7 @@ dashmap = { version = "6.0", optional = true}

beam-lib = { workspace = true }
async-trait = "0.1"
tracing-appender = "0.2.3"

[features]
expire_map = ["dep:dashmap"]
Expand Down
14 changes: 14 additions & 0 deletions shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,20 @@ impl Msg for MsgPing {
}
}

pub fn format_to_without_broker(to: &[AppOrProxyId]) -> impl Display + use<'_> {
struct Helper<'a> {
to: &'a [AppOrProxyId],
}

impl<'a> std::fmt::Display for Helper<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.to.iter().map(|to| to.hide_broker())).finish()
}
}

Helper { to }
}

pub fn try_read<T>(map: &HashMap<String, String>, key: &str) -> Option<T>
where
T: FromStr,
Expand Down
Loading
Loading