Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"Bash(gh repo view:*)",
"WebFetch(domain:docs.sentry.io)",
"WebFetch(domain:develop.sentry.dev)",
"Bash(grep:*)",
"Bash(grep:*)"
],
"deny": []
}
Expand Down
1 change: 1 addition & 0 deletions sentry-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ client = ["rand"]
test = ["client", "release-health"]
release-health = []
logs = []
metrics = []

[dependencies]
log = { version = "0.4.8", optional = true, features = ["std"] }
Expand Down
337 changes: 337 additions & 0 deletions sentry-core/src/batcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
//! Generic batching for Sentry envelope items (logs, metrics, etc.).
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved & generalized from logs.rs.


use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

use crate::client::TransportArc;
use crate::protocol::EnvelopeItem;
use crate::Envelope;

// Flush when there's 100 items in the buffer
const MAX_ITEMS: usize = 100;
// Or when 5 seconds have passed from the last flush
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);

/// Accumulates items in a queue and submits them through the transport when one of the flushing
/// conditions is met: either the queue reaches [`MAX_ITEMS`] or [`FLUSH_INTERVAL`] has elapsed.
pub(crate) struct Batcher<T: Send + 'static> {
transport: TransportArc,
queue: Arc<Mutex<Vec<T>>>,
shutdown: Arc<(Mutex<bool>, Condvar)>,
worker: Option<JoinHandle<()>>,
into_envelope_item: fn(Vec<T>) -> EnvelopeItem,
name: &'static str,
}

impl<T: Send + 'static> Batcher<T> {
/// Creates a new Batcher that will submit envelopes to the given `transport`.
///
/// `name` is used for the background thread name and debug logging.
/// `into_envelope_item` converts a batch of items into an [`EnvelopeItem`].
pub(crate) fn new(
transport: TransportArc,
name: &'static str,
into_envelope_item: fn(Vec<T>) -> EnvelopeItem,
) -> Self {
let queue: Arc<Mutex<Vec<T>>> = Arc::new(Mutex::new(Vec::new()));
#[allow(clippy::mutex_atomic)]
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));

let worker_transport = transport.clone();
let worker_queue = queue.clone();
let worker_shutdown = shutdown.clone();
let worker = std::thread::Builder::new()
.name(format!("sentry-{name}-batcher"))
.spawn(move || {
let (lock, cvar) = worker_shutdown.as_ref();
let mut shutdown = lock.lock().unwrap();
// check this immediately, in case the main thread is already shutting down
if *shutdown {
return;
}
let mut last_flush = Instant::now();
loop {
let timeout = FLUSH_INTERVAL
.checked_sub(last_flush.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
if *shutdown {
return;
}
if last_flush.elapsed() >= FLUSH_INTERVAL {
Self::flush_queue_internal(
worker_queue.lock().unwrap(),
&worker_transport,
into_envelope_item,
name,
);
last_flush = Instant::now();
}
}
})
.unwrap();

Self {
transport,
queue,
shutdown,
worker: Some(worker),
into_envelope_item,
name,
}
}

/// Enqueues an item for delayed sending.
///
/// This will automatically flush the queue if it reaches [`MAX_ITEMS`].
pub(crate) fn enqueue(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
if queue.len() >= MAX_ITEMS {
Self::flush_queue_internal(queue, &self.transport, self.into_envelope_item, self.name);
}
}

/// Flushes the queue to the transport.
pub(crate) fn flush(&self) {
let queue = self.queue.lock().unwrap();
Self::flush_queue_internal(queue, &self.transport, self.into_envelope_item, self.name);
}

/// Flushes the queue to the transport.
///
/// This is a static method as it will be called from both the background
/// thread and the main thread on drop.
fn flush_queue_internal(
mut queue_lock: MutexGuard<Vec<T>>,
transport: &TransportArc,
into_envelope_item: fn(Vec<T>) -> EnvelopeItem,
name: &str,
) {
let items = std::mem::take(&mut *queue_lock);
drop(queue_lock);

if items.is_empty() {
return;
}

sentry_debug!("[Batcher({name})] Flushing {} items", items.len());

if let Some(ref transport) = *transport.read().unwrap() {
let mut envelope = Envelope::new();
envelope.add_item(into_envelope_item(items));
transport.send_envelope(envelope);
}
}
}

impl<T: Send + 'static> Drop for Batcher<T> {
fn drop(&mut self) {
let (lock, cvar) = self.shutdown.as_ref();
*lock.lock().unwrap() = true;
cvar.notify_one();

if let Some(worker) = self.worker.take() {
worker.join().ok();
}
Self::flush_queue_internal(
self.queue.lock().unwrap(),
&self.transport,
self.into_envelope_item,
self.name,
);
}
}

#[cfg(all(test, feature = "test"))]
mod tests {
use crate::test;

// ---- Log batching tests ----

#[cfg(feature = "logs")]
mod log_tests {
use super::*;
use crate::logger_info;

#[test]
fn test_logs_batching() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..150 {
logger_info!("test log {}", i);
}
},
crate::ClientOptions {
enable_logs: true,
..Default::default()
},
);

assert_eq!(2, envelopes.len());

let mut total_logs = 0;
for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::Logs(logs),
) = item
{
total_logs += logs.len();
}
}
}

assert_eq!(150, total_logs);
}

#[test]
fn test_logs_batcher_flush() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..12 {
logger_info!("test log {}", i);
}
},
crate::ClientOptions {
enable_logs: true,
..Default::default()
},
);

assert_eq!(1, envelopes.len());

for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::Logs(logs),
) = item
{
assert_eq!(12, logs.len());
break;
}
}
}
}
}

// ---- Metric batching tests ----

#[cfg(feature = "metrics")]
mod metric_tests {
use super::*;
use sentry_types::protocol::v7::{TraceId, TraceMetric, TraceMetricType};
use std::time::SystemTime;

fn test_metric(name: &str) -> TraceMetric {
TraceMetric {
r#type: TraceMetricType::Counter,
name: name.to_owned(),
value: 1.0,
timestamp: SystemTime::now(),
trace_id: TraceId::default(),
span_id: None,
unit: None,
attributes: Default::default(),
}
}

#[test]
fn test_metrics_batching() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..150 {
crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}")));
}
},
crate::ClientOptions::default(),
);

assert_eq!(2, envelopes.len());

let mut total_metrics = 0;
for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::TraceMetrics(metrics),
) = item
{
total_metrics += metrics.len();
}
}
}

assert_eq!(150, total_metrics);
}

#[test]
fn test_metrics_sample_rate_zero() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..10 {
crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}")));
}
},
crate::ClientOptions {
metrics_sample_rate: 0.0,
..Default::default()
},
);

assert_eq!(0, envelopes.len());
}

#[test]
fn test_metrics_sample_rate_annotates() {
use sentry_types::protocol::v7::LogAttribute;

let envelopes = test::with_captured_envelopes_options(
|| {
crate::Hub::current().capture_metric(test_metric("metric.test"));
},
crate::ClientOptions::default(),
);

assert_eq!(1, envelopes.len());
for item in envelopes[0].items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::TraceMetrics(metrics),
) = item
{
assert_eq!(1, metrics.len());
assert_eq!(
metrics[0].attributes.get("sentry.client_sample_rate"),
Some(&LogAttribute::from(1.0_f64))
);
}
}
}

#[test]
fn test_metrics_batcher_flush() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..12 {
crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}")));
}
},
crate::ClientOptions::default(),
);

assert_eq!(1, envelopes.len());

for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::TraceMetrics(metrics),
) = item
{
assert_eq!(12, metrics.len());
break;
}
}
}
}
}
}
Loading