Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"Bash(gh repo view:*)",
"WebFetch(domain:docs.sentry.io)",
"WebFetch(domain:develop.sentry.dev)",
"Bash(grep:*)",
"Bash(grep:*)"
],
"deny": []
}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added a `Envelope::into_items` method, which returns an iterator over owned [`EnvelopeItem`s](https://docs.rs/sentry/0.46.2/sentry/protocol/enum.EnvelopeItem.html) in the [`Envelope`](https://docs.rs/sentry/0.46.2/sentry/struct.Envelope.html) ([#983](https://github.com/getsentry/sentry-rust/pull/983)).
- Expose transport utilities ([#949](https://github.com/getsentry/sentry-rust/pull/949))
- Add support for [trace metrics](https://develop.sentry.dev/sdk/telemetry/metrics/) ([#997](https://github.com/getsentry/sentry-rust/pull/997))

## 0.46.2

Expand Down
1 change: 1 addition & 0 deletions sentry-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ client = ["rand"]
test = ["client", "release-health"]
release-health = []
logs = []
metrics = []

[dependencies]
log = { version = "0.4.8", optional = true, features = ["std"] }
Expand Down
345 changes: 345 additions & 0 deletions sentry-core/src/batcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,345 @@
//! Generic batching for Sentry envelope items (logs, metrics, etc.).
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved & generalized from logs.rs.


use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};

use crate::client::TransportArc;
use crate::protocol::EnvelopeItem;
use crate::Envelope;

// Flush when there's 100 items in the buffer
const MAX_ITEMS: usize = 100;
// Or when 5 seconds have passed from the last flush
const FLUSH_INTERVAL: Duration = Duration::from_secs(5);

/// Accumulates items in a queue and submits them through the transport when one of the flushing
/// conditions is met: either the queue reaches [`MAX_ITEMS`] or [`FLUSH_INTERVAL`] has elapsed.
pub(crate) struct Batcher<T>
where
EnvelopeItem: From<Vec<T>>,
T: Send + 'static,
{
transport: TransportArc,
queue: Arc<Mutex<Vec<T>>>,
shutdown: Arc<(Mutex<bool>, Condvar)>,
worker: Option<JoinHandle<()>>,
name: &'static str,
}

impl<T> Batcher<T>
where
EnvelopeItem: From<Vec<T>>,
T: Send + 'static,
{
/// Creates a new Batcher that will submit envelopes to the given `transport`.
///
/// `name` is used for the background thread name and debug logging.
/// `into_envelope_item` converts a batch of items into an [`EnvelopeItem`].
pub(crate) fn new(transport: TransportArc, name: &'static str) -> Self {
let queue: Arc<Mutex<Vec<T>>> = Arc::new(Mutex::new(Vec::new()));
#[allow(clippy::mutex_atomic)]
let shutdown = Arc::new((Mutex::new(false), Condvar::new()));

let worker_transport = transport.clone();
let worker_queue = queue.clone();
let worker_shutdown = shutdown.clone();
let worker = std::thread::Builder::new()
.name(format!("sentry-{name}-batcher"))
.spawn(move || {
let (lock, cvar) = worker_shutdown.as_ref();
let mut shutdown = lock.lock().unwrap();
// check this immediately, in case the main thread is already shutting down
if *shutdown {
return;
}
let mut last_flush = Instant::now();
loop {
let timeout = FLUSH_INTERVAL
.checked_sub(last_flush.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
if *shutdown {
return;
}
if last_flush.elapsed() >= FLUSH_INTERVAL {
Self::flush_queue_internal(
worker_queue.lock().unwrap(),
&worker_transport,
name,
);
last_flush = Instant::now();
}
}
})
.unwrap();

Self {
transport,
queue,
shutdown,
worker: Some(worker),
name,
}
}

/// Enqueues an item for delayed sending.
///
/// This will automatically flush the queue if it reaches [`MAX_ITEMS`].
pub(crate) fn enqueue(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
queue.push(item);
if queue.len() >= MAX_ITEMS {
Self::flush_queue_internal(queue, &self.transport, self.name);
}
}

/// Flushes the queue to the transport.
pub(crate) fn flush(&self) {
let queue = self.queue.lock().unwrap();
Self::flush_queue_internal(queue, &self.transport, self.name);
}

/// Flushes the queue to the transport.
///
/// This is a static method as it will be called from both the background
/// thread and the main thread on drop.
fn flush_queue_internal(
mut queue_lock: MutexGuard<Vec<T>>,
transport: &TransportArc,
name: &str,
) {
let items = std::mem::take(&mut *queue_lock);
drop(queue_lock);

if items.is_empty() {
return;
}

sentry_debug!("[Batcher({name})] Flushing {} items", items.len());

if let Some(ref transport) = *transport.read().unwrap() {
let mut envelope = Envelope::new();
envelope.add_item(items);
transport.send_envelope(envelope);
}
}
}

impl<T> Drop for Batcher<T>
where
EnvelopeItem: From<Vec<T>>,
T: Send + 'static,
{
fn drop(&mut self) {
let (lock, cvar) = self.shutdown.as_ref();
*lock.lock().unwrap() = true;
cvar.notify_one();

if let Some(worker) = self.worker.take() {
worker.join().ok();
}
Self::flush_queue_internal(self.queue.lock().unwrap(), &self.transport, self.name);
}
}

#[cfg(all(test, feature = "test"))]
mod tests {
use crate::test;

// ---- Log batching tests ----

#[cfg(feature = "logs")]
mod log_tests {
use super::*;
use crate::logger_info;

#[test]
fn test_logs_batching() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..150 {
logger_info!("test log {}", i);
}
},
crate::ClientOptions {
enable_logs: true,
..Default::default()
},
);

assert_eq!(2, envelopes.len());

let mut total_logs = 0;
for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::Logs(logs),
) = item
{
total_logs += logs.len();
}
}
}

assert_eq!(150, total_logs);
}

#[test]
fn test_logs_batcher_flush() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..12 {
logger_info!("test log {}", i);
}
},
crate::ClientOptions {
enable_logs: true,
..Default::default()
},
);

assert_eq!(1, envelopes.len());

for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::Logs(logs),
) = item
{
assert_eq!(12, logs.len());
break;
}
}
}
}
}

// ---- Metric batching tests ----

#[cfg(feature = "metrics")]
mod metric_tests {
use super::*;
use sentry_types::protocol::v7::{TraceId, TraceMetric, TraceMetricType};
use std::time::SystemTime;

fn test_metric(name: &str) -> TraceMetric {
TraceMetric {
r#type: TraceMetricType::Counter,
name: name.to_owned(),
value: 1.0,
timestamp: SystemTime::now(),
trace_id: TraceId::default(),
span_id: None,
unit: None,
attributes: Default::default(),
}
}

#[test]
fn test_metrics_batching() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..150 {
crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}")));
}
},
crate::ClientOptions {
metrics_sample_rate: 1.0,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spec doesn't mention any metrics_sample_rate, I don't think we should have it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will revert to a boolean flag. I need sampling for the relay use case (I wouldn't dare to send the entire volume at once), but I can add the sentry.client_sample_rate attribute there manually.

What about what the specs say about an _experiments section, should we move the flag there like the spec suggests?

Copy link
Copy Markdown
Member

@lcian lcian Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could also keep the sample rate, if this is going to be used by internal services we're gonna need it at some point anyways.

Another approach which I prefer for Rust is to feature flag it and make sure we state that it's clear that the flag is experimental and could be removed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping the enable_metrics flag for now to keep it simple & spec-conformant. The spec now seems to allow putting it in the stable section.

..Default::default()
},
);

assert_eq!(2, envelopes.len());

let mut total_metrics = 0;
for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::TraceMetrics(metrics),
) = item
{
total_metrics += metrics.len();
}
}
}

assert_eq!(150, total_metrics);
}

#[test]
fn test_metrics_sample_rate_zero() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..10 {
crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}")));
}
},
crate::ClientOptions {
metrics_sample_rate: 0.0,
..Default::default()
},
);

assert_eq!(0, envelopes.len());
}

#[test]
fn test_metrics_sample_rate_annotates() {
use sentry_types::protocol::v7::LogAttribute;

let envelopes = test::with_captured_envelopes_options(
|| {
crate::Hub::current().capture_metric(test_metric("metric.test"));
},
crate::ClientOptions {
metrics_sample_rate: 1.0,
..Default::default()
},
);

assert_eq!(1, envelopes.len());
for item in envelopes[0].items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::TraceMetrics(metrics),
) = item
{
assert_eq!(1, metrics.len());
assert_eq!(
metrics[0].attributes.get("sentry.client_sample_rate"),
Some(&LogAttribute::from(1.0_f64))
);
}
}
}

#[test]
fn test_metrics_batcher_flush() {
let envelopes = test::with_captured_envelopes_options(
|| {
for i in 0..12 {
crate::Hub::current().capture_metric(test_metric(&format!("metric.{i}")));
}
},
crate::ClientOptions {
metrics_sample_rate: 1.0,
..Default::default()
},
);

assert_eq!(1, envelopes.len());

for envelope in &envelopes {
for item in envelope.items() {
if let crate::protocol::EnvelopeItem::ItemContainer(
crate::protocol::ItemContainer::TraceMetrics(metrics),
) = item
{
assert_eq!(12, metrics.len());
break;
}
}
}
}
}
}
Loading