From 069ad03b576b50a97123211e658df0df4196163d Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Tue, 5 May 2026 08:34:52 +0200 Subject: [PATCH 1/8] Introduce more typing, apply pipeline pattern --- relay-profiling/src/lib.rs | 12 +- .../android/perfetto/profile_chunk.envelope | 2 +- relay-server/src/envelope/content_type.rs | 5 + .../src/processing/profile_chunks/mod.rs | 323 ++++++++++------ .../src/processing/profile_chunks/process.rs | 357 +++++++----------- .../src/processing/profile_chunks/store.rs | 11 + relay-server/src/services/store.rs | 62 ++- .../test_profile_chunks_perfetto.py | 4 + 8 files changed, 420 insertions(+), 356 deletions(-) create mode 100644 relay-server/src/processing/profile_chunks/store.rs diff --git a/relay-profiling/src/lib.rs b/relay-profiling/src/lib.rs index 7332fd66c43..a973bb2de04 100644 --- a/relay-profiling/src/lib.rs +++ b/relay-profiling/src/lib.rs @@ -405,23 +405,21 @@ impl Getter for ExpandedPerfettoChunk { /// Expands a binary Perfetto trace into a Sample v2 profile chunk. /// -/// Decodes the protobuf trace, converts it into the internal Sample v2 format, -/// merges the provided JSON `metadata_json` (containing platform, environment, etc.), -/// and returns an [`ExpandedPerfettoChunk`] with the serialized JSON payload plus +/// Returns an [`ExpandedPerfettoChunk`] with the serialized JSON payload plus /// the profile metadata needed for downstream processing (platform, profile type, /// inbound filtering) — avoiding a second JSON deserialization pass in callers. pub fn expand_perfetto( - perfetto_bytes: &[u8], - metadata_json: &[u8], + perfetto_payload: &[u8], + json_payload: &[u8], ) -> Result { - let d = &mut Deserializer::from_slice(metadata_json); + let d = &mut Deserializer::from_slice(json_payload); let mut chunk: sample::v2::ProfileChunk = serde_path_to_error::deserialize(d).map_err(ProfileError::InvalidJson)?; let platform = chunk.metadata.platform.clone(); let release = chunk.metadata.release.clone(); - let (profile_data, debug_images) = perfetto::convert(perfetto_bytes)?; + let (profile_data, debug_images) = perfetto::convert(perfetto_payload)?; chunk.profile = profile_data; chunk.metadata.debug_meta.images.extend(debug_images); chunk.normalize()?; diff --git a/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope b/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope index 5b1c765bdeb..dc0cf89c9ed 100644 --- a/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope +++ b/relay-profiling/tests/fixtures/android/perfetto/profile_chunk.envelope @@ -1,5 +1,5 @@ {"event_id":"c3b09c0608844f558eaf6e65df6b9cdf","sdk":{"name":"sentry.java.android","version":"8.38.0","packages":[{"name":"maven:io.sentry:sentry","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-core","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-fragment","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-timber","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-replay","version":"8.38.0"},{"name":"maven:io.sentry:sentry-spotlight","version":"8.38.0"},{"name":"maven:io.sentry:sentry-compose","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-ndk","version":"8.38.0"}],"integrations":["Screenshot","ViewHierarchy","UncaughtExceptionHandler","ShutdownHook","Spotlight","SendCachedEnvelope","Ndk","Tombstone","AppLifecycle","AnrV2","AnrProfiling","ActivityLifecycle","ActivityBreadcrumbs","UserInteraction","FeedbackShake","FragmentLifecycle","Timber","AppComponentsBreadcrumbs","NetworkBreadcrumbs","AutoInit","EnvelopeFileObserver","SystemEventsBreadcrumbs"]}} -{"content_type":"application/octet-stream","filename":"profile_sentry-profiling_2026-04-28-08-33-40.perfetto-stack-sample","type":"profile_chunk","platform":"android","meta_length":7739,"length":104991} +{"content_type":"application/x-perfetto-trace","filename":"profile_sentry-profiling_2026-04-28-08-33-40.perfetto-stack-sample","type":"profile_chunk","platform":"android","meta_length":7739,"length":104991} {"profiler_id":"814b081c638b4ad982ae351547bfe499","chunk_id":"c3b09c0608844f558eaf6e65df6b9cdf","client_sdk":{"name":"sentry.java.android","version":"8.38.0","packages":[{"name":"maven:io.sentry:sentry","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-core","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-fragment","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-timber","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-replay","version":"8.38.0"},{"name":"maven:io.sentry:sentry-spotlight","version":"8.38.0"},{"name":"maven:io.sentry:sentry-compose","version":"8.38.0"},{"name":"maven:io.sentry:sentry-android-ndk","version":"8.38.0"}],"integrations":["Screenshot","ViewHierarchy","UncaughtExceptionHandler","ShutdownHook","Spotlight","SendCachedEnvelope","Ndk","Tombstone","AppLifecycle","AnrV2","AnrProfiling","ActivityLifecycle","ActivityBreadcrumbs","UserInteraction","FeedbackShake","FragmentLifecycle","Timber","AppComponentsBreadcrumbs","NetworkBreadcrumbs","AutoInit","EnvelopeFileObserver","SystemEventsBreadcrumbs"]},"measurements":{"memory_native_footprint":{"unit":"byte","values":[{"value":3.6631152E7,"elapsed_since_start_ns":"1777358020895000000","timestamp":1777358020.895000},{"value":3.6636E7,"elapsed_since_start_ns":"1777358020994000000","timestamp":1777358020.994000},{"value":3.6598336E7,"elapsed_since_start_ns":"1777358021094000000","timestamp":1777358021.094000},{"value":3.6600496E7,"elapsed_since_start_ns":"1777358021194000000","timestamp":1777358021.193999},{"value":3.6601984E7,"elapsed_since_start_ns":"1777358021294000000","timestamp":1777358021.294000},{"value":3.6604128E7,"elapsed_since_start_ns":"1777358021394000000","timestamp":1777358021.393999},{"value":3.6606272E7,"elapsed_since_start_ns":"1777358021494000000","timestamp":1777358021.494000},{"value":3.6608416E7,"elapsed_since_start_ns":"1777358021594000000","timestamp":1777358021.593999},{"value":3.6614672E7,"elapsed_since_start_ns":"1777358021695000000","timestamp":1777358021.695000},{"value":3.6616816E7,"elapsed_since_start_ns":"1777358021794000000","timestamp":1777358021.794000},{"value":3.661896E7,"elapsed_since_start_ns":"1777358021894000000","timestamp":1777358021.894000},{"value":3.6621104E7,"elapsed_since_start_ns":"1777358021995000000","timestamp":1777358021.995000},{"value":3.6623248E7,"elapsed_since_start_ns":"1777358022094000000","timestamp":1777358022.094000},{"value":3.6625392E7,"elapsed_since_start_ns":"1777358022194000000","timestamp":1777358022.193999},{"value":3.6627536E7,"elapsed_since_start_ns":"1777358022294000000","timestamp":1777358022.294000},{"value":3.662968E7,"elapsed_since_start_ns":"1777358022394000000","timestamp":1777358022.393999},{"value":3.6631824E7,"elapsed_since_start_ns":"1777358022495000000","timestamp":1777358022.495000},{"value":3.6672752E7,"elapsed_since_start_ns":"1777358022594000000","timestamp":1777358022.593999},{"value":3.6748144E7,"elapsed_since_start_ns":"1777358022694000000","timestamp":1777358022.694000},{"value":3.6754304E7,"elapsed_since_start_ns":"1777358022794000000","timestamp":1777358022.794000}]},"frozen_frame_renders":{"unit":"nanosecond","values":[{"value":7.49833322E8,"elapsed_since_start_ns":"71057630775779","timestamp":1777358020.888000}]},"cpu_usage":{"unit":"percent","values":[{"value":56.20094079375762,"elapsed_since_start_ns":"1777358020895000000","timestamp":1777358020.895000},{"value":47.72786092177692,"elapsed_since_start_ns":"1777358020994000000","timestamp":1777358020.994000},{"value":52.289708049827254,"elapsed_since_start_ns":"1777358021094000000","timestamp":1777358021.094000},{"value":50.050196342916244,"elapsed_since_start_ns":"1777358021194000000","timestamp":1777358021.193999},{"value":52.620478795841386,"elapsed_since_start_ns":"1777358021294000000","timestamp":1777358021.294000},{"value":49.83694994597027,"elapsed_since_start_ns":"1777358021394000000","timestamp":1777358021.393999},{"value":52.61821576681683,"elapsed_since_start_ns":"1777358021494000000","timestamp":1777358021.494000},{"value":50.00733407561553,"elapsed_since_start_ns":"1777358021594000000","timestamp":1777358021.593999},{"value":52.31104830862539,"elapsed_since_start_ns":"1777358021695000000","timestamp":1777358021.695000},{"value":50.08750688152257,"elapsed_since_start_ns":"1777358021794000000","timestamp":1777358021.794000},{"value":52.61428295786996,"elapsed_since_start_ns":"1777358021894000000","timestamp":1777358021.894000},{"value":49.84011689221911,"elapsed_since_start_ns":"1777358021995000000","timestamp":1777358021.995000},{"value":50.07609463188072,"elapsed_since_start_ns":"1777358022094000000","timestamp":1777358022.094000},{"value":52.764437950744615,"elapsed_since_start_ns":"1777358022194000000","timestamp":1777358022.193999},{"value":49.7033742388127,"elapsed_since_start_ns":"1777358022294000000","timestamp":1777358022.294000},{"value":52.63426105211958,"elapsed_since_start_ns":"1777358022394000000","timestamp":1777358022.393999},{"value":49.806191656715804,"elapsed_since_start_ns":"1777358022495000000","timestamp":1777358022.495000},{"value":52.611141035437356,"elapsed_since_start_ns":"1777358022594000000","timestamp":1777358022.593999},{"value":32.55163503106803,"elapsed_since_start_ns":"1777358022694000000","timestamp":1777358022.694000},{"value":2.50511253386361,"elapsed_since_start_ns":"1777358022794000000","timestamp":1777358022.794000}]},"memory_footprint":{"unit":"byte","values":[{"value":1.18884E7,"elapsed_since_start_ns":"1777358020895000000","timestamp":1777358020.895000},{"value":1.2003504E7,"elapsed_since_start_ns":"1777358020994000000","timestamp":1777358020.994000},{"value":1.2056752E7,"elapsed_since_start_ns":"1777358021094000000","timestamp":1777358021.094000},{"value":1.211E7,"elapsed_since_start_ns":"1777358021194000000","timestamp":1777358021.193999},{"value":1.213048E7,"elapsed_since_start_ns":"1777358021294000000","timestamp":1777358021.294000},{"value":1.215096E7,"elapsed_since_start_ns":"1777358021394000000","timestamp":1777358021.393999},{"value":1.22124E7,"elapsed_since_start_ns":"1777358021494000000","timestamp":1777358021.494000},{"value":1.223288E7,"elapsed_since_start_ns":"1777358021594000000","timestamp":1777358021.593999},{"value":1.2286128E7,"elapsed_since_start_ns":"1777358021695000000","timestamp":1777358021.695000},{"value":1.2339376E7,"elapsed_since_start_ns":"1777358021794000000","timestamp":1777358021.794000},{"value":1.2359856E7,"elapsed_since_start_ns":"1777358021894000000","timestamp":1777358021.894000},{"value":1.2421296E7,"elapsed_since_start_ns":"1777358021995000000","timestamp":1777358021.995000},{"value":1.2441776E7,"elapsed_since_start_ns":"1777358022094000000","timestamp":1777358022.094000},{"value":1.2495024E7,"elapsed_since_start_ns":"1777358022194000000","timestamp":1777358022.193999},{"value":1.2515504E7,"elapsed_since_start_ns":"1777358022294000000","timestamp":1777358022.294000},{"value":1.2535984E7,"elapsed_since_start_ns":"1777358022394000000","timestamp":1777358022.393999},{"value":1.2597424E7,"elapsed_since_start_ns":"1777358022495000000","timestamp":1777358022.495000},{"value":1.2617904E7,"elapsed_since_start_ns":"1777358022594000000","timestamp":1777358022.593999},{"value":1.2892512E7,"elapsed_since_start_ns":"1777358022694000000","timestamp":1777358022.694000},{"value":1.294576E7,"elapsed_since_start_ns":"1777358022794000000","timestamp":1777358022.794000}]},"screen_frame_rates":{"unit":"hz","values":[{"value":60.000003814697266,"elapsed_since_start_ns":"71057630775779","timestamp":1777358020.888000}]}},"platform":"android","release":"io.sentry.samples.android@8.38.0+2","environment":"debug","version":"2","content_type":"perfetto","timestamp":1777358020.855000} U2N diff --git a/relay-server/src/envelope/content_type.rs b/relay-server/src/envelope/content_type.rs index 75c9be75a70..df30de0b646 100644 --- a/relay-server/src/envelope/content_type.rs +++ b/relay-server/src/envelope/content_type.rs @@ -38,6 +38,8 @@ pub enum ContentType { TraceAttachment, /// `application/vnd.sentry.attachment-ref+json` AttachmentRef, + /// `application/x-perfetto-trace` + PerfettoTrace, /// All integration content types. Integration(Integration), } @@ -60,6 +62,7 @@ impl ContentType { Self::TraceMetricContainer => "application/vnd.sentry.items.trace-metric+json", Self::TraceAttachment => "application/vnd.sentry.trace-attachment", Self::AttachmentRef => "application/vnd.sentry.attachment-ref+json", + Self::PerfettoTrace => "application/x-perfetto-trace", Self::Integration(integration) => integration.as_content_type(), } } @@ -109,6 +112,8 @@ impl ContentType { || ct.eq_ignore_ascii_case("application/vnd.sentry.attachment-ref") { Some(Self::AttachmentRef) + } else if ct.eq_ignore_ascii_case(Self::PerfettoTrace.as_str()) { + Some(Self::PerfettoTrace) } else { Integration::from_content_type(ct).map(Self::Integration) } diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index d1c69d39a5c..cc270a7cd70 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -1,17 +1,22 @@ use std::sync::Arc; +use bytes::Bytes; +use serde::Serialize; +use smallvec::smallvec; + use relay_profiling::ProfileType; use relay_quotas::{DataCategory, RateLimits}; use crate::Envelope; -use crate::envelope::{EnvelopeHeaders, Item, ItemType, Items}; +use crate::envelope::{ContentType, EnvelopeHeaders, Item, ItemType, Items}; use crate::managed::{Counted, Managed, ManagedEnvelope, ManagedResult as _, Quantities, Rejected}; use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter}; use crate::services::outcome::{DiscardReason, Outcome}; -use smallvec::smallvec; mod filter; mod process; +#[cfg(feature = "processing")] +mod store; pub type Result = std::result::Result; @@ -56,6 +61,50 @@ impl crate::managed::OutcomeError for Error { } } +#[derive(Clone, Debug, Serialize)] +pub struct RawProfile { + #[serde(rename = "raw_profile")] + pub payload: Bytes, + #[serde(rename = "raw_profile_content_type")] + pub content_type: ContentType, +} + +/// A single profile chunk after expansion. +#[derive(Debug)] +pub struct ExpandedProfileChunk { + pub payload: Bytes, + pub raw_profile: Option, + pub quantities: Quantities, +} + +impl Counted for ExpandedProfileChunk { + fn quantities(&self) -> Quantities { + self.quantities.clone() + } +} + +/// Profile chunks after expansion: all items have been parsed, validated, and +/// converted into typed representations. +#[derive(Debug)] +pub struct ExpandedProfileChunks { + pub headers: EnvelopeHeaders, + pub chunks: Vec, +} + +impl Counted for ExpandedProfileChunks { + fn quantities(&self) -> Quantities { + let mut q = Quantities::new(); + for chunk in &self.chunks { + q.extend(chunk.quantities()); + } + q + } +} + +impl CountRateLimited for Managed { + type Error = Error; +} + /// A processor for profile chunks. /// /// It processes items of type: [`ItemType::ProfileChunk`]. @@ -97,31 +146,63 @@ impl processing::Processor for ProfileChunksProcessor { async fn process( &self, - mut profile_chunks: Managed, + profile_chunks: Managed, ctx: Context<'_>, ) -> Result, Rejected> { filter::feature_flag(ctx).reject(&profile_chunks)?; - process::process(&mut profile_chunks, ctx); + if !ctx.is_processing() { + return Ok(Output::just(ProfileChunkOutput::Serialized(profile_chunks))); + } - let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?; + let expanded: Managed = process::expand(profile_chunks, ctx); + let expanded = self.limiter.enforce_quotas(expanded, ctx).await?; - Ok(Output::just(ProfileChunkOutput(profile_chunks))) + Ok(Output::just(ProfileChunkOutput::Expanded(expanded))) } } /// Output produced by [`ProfileChunksProcessor`]. #[derive(Debug)] -pub struct ProfileChunkOutput(Managed); +pub enum ProfileChunkOutput { + /// Non-processing relay: items forwarded as-is. + Serialized(Managed), + /// Processing relay: items expanded into typed representations. + Expanded(Managed), +} impl Forward for ProfileChunkOutput { fn serialize_envelope( self, _: processing::ForwardContext<'_>, ) -> Result>, Rejected<()>> { - let Self(profile_chunks) = self; - Ok(profile_chunks - .map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks)))) + match self { + Self::Serialized(profile_chunks) => Ok(profile_chunks + .map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks)))), + Self::Expanded(expanded) => Ok(expanded.map(|e, _| { + let items = e + .chunks + .into_iter() + .map(|chunk| { + let mut item = Item::new(ItemType::ProfileChunk); + if let Some(raw_profile) = chunk.raw_profile { + let meta_length = chunk.payload.len() as u32; + let mut compound = bytes::BytesMut::with_capacity( + chunk.payload.len() + raw_profile.payload.len(), + ); + compound.extend_from_slice(&chunk.payload); + compound.extend_from_slice(&raw_profile.payload); + item.set_payload(raw_profile.content_type, compound.freeze()); + item.set_meta_length(meta_length); + } else { + item.set_payload(ContentType::Json, chunk.payload); + } + item + }) + .collect(); + Envelope::from_parts(e.headers, Items::from_vec(items)) + })), + } } #[cfg(feature = "processing")] @@ -130,57 +211,24 @@ impl Forward for ProfileChunkOutput { s: processing::forward::StoreHandle<'_>, ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - use crate::services::store::{RawProfileContentType, StoreProfileChunk}; - - let Self(profile_chunks) = self; + let expanded = match self { + Self::Expanded(e) => e, + Self::Serialized(m) => { + return Err( + m.internal_error("forward_store called with non-expanded profile chunks") + ); + } + }; let retention_days = ctx.event_retention().standard; - for item in profile_chunks.split(|pc| pc.profile_chunks) { - let (kafka_payload, raw_profile) = split_item_payload(&item); - - s.send_to_store(item.map(|item, _| StoreProfileChunk { - retention_days, - payload: kafka_payload, - quantities: item.quantities(), - raw_profile_content_type: if raw_profile.is_some() { - Some(RawProfileContentType::Perfetto) - } else { - None - }, - raw_profile, - })); + for chunk in expanded.split(|e| e.chunks) { + s.send_to_store(chunk.map(|chunk, _| store::convert(chunk, retention_days))); } Ok(()) } } -/// Splits a profile chunk item payload into its constituent parts. -/// -/// For compound items (those with a `meta_length` header), the payload is -/// `[expanded JSON][raw binary]`. Returns `(kafka_payload, raw_profile)`. -/// -/// For plain items, returns `(full_payload, None)`. -#[cfg(any(feature = "processing", test))] -fn split_item_payload(item: &Item) -> (bytes::Bytes, Option) { - let payload = item.payload(); - - let Some(meta_length) = item.meta_length() else { - return (payload, None); - }; - - let meta_length = meta_length as usize; - let Some((meta, body)) = payload.split_at_checked(meta_length) else { - return (payload, None); - }; - - if body.is_empty() { - return (payload.slice_ref(meta), None); - } - - (payload.slice_ref(meta), Some(payload.slice_ref(body))) -} - /// Serialized profile chunks extracted from an envelope. #[derive(Debug)] pub struct SerializedProfileChunks { @@ -223,81 +271,128 @@ impl CountRateLimited for Managed { mod tests { use similar_asserts::assert_eq; - use crate::envelope::ContentType; - use super::*; - - fn make_chunk_item(meta: &[u8]) -> Item { - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::Json, bytes::Bytes::copy_from_slice(meta)); - item + use crate::Envelope; + use crate::envelope::ContentType; + use crate::extractors::RequestMeta; + use crate::processing::Context; + + fn make_expanded( + chunks: Vec, + ) -> ( + Managed, + crate::managed::ManagedTestHandle, + ) { + let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" + .parse() + .unwrap(); + let envelope = Envelope::from_request(None, RequestMeta::new(dsn)); + let headers = envelope.headers().clone(); + Managed::for_test(ExpandedProfileChunks { headers, chunks }).build() } - fn make_compound_item(meta: &[u8], body: &[u8]) -> Item { - let meta_length = meta.len(); - let mut payload = bytes::BytesMut::with_capacity(meta_length + body.len()); - payload.extend_from_slice(meta); - payload.extend_from_slice(body); - - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, payload.freeze()); - item.set_meta_length(meta_length as u32); - item + #[test] + fn test_serialize_envelope_json_only() { + let chunk = ExpandedProfileChunk { + payload: Bytes::from(b"{\"hello\":\"world\"}".as_ref()), + raw_profile: None, + quantities: smallvec![], + }; + let (managed, _handle) = make_expanded(vec![chunk]); + let output = ProfileChunkOutput::Expanded(managed); + + let envelope = output + .serialize_envelope(Context::for_test().to_forward()) + .unwrap() + .accept(|e| e); + + let items: Vec<_> = envelope.items().collect(); + assert_eq!(items.len(), 1); + assert_eq!(items[0].payload().as_ref(), b"{\"hello\":\"world\"}"); + assert!(items[0].meta_length().is_none()); + assert_eq!(items[0].content_type(), Some(ContentType::Json)); } #[test] - fn test_split_plain_chunk() { - let item = make_chunk_item(b"{}"); - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), b"{}"); - assert!(raw.is_none()); + fn test_serialize_envelope_compound() { + let json_payload = Bytes::from(b"{\"expanded\":true}".as_ref()); + let raw_data = Bytes::from(b"raw-binary-blob".as_ref()); + let chunk = ExpandedProfileChunk { + payload: json_payload.clone(), + raw_profile: Some(RawProfile { + payload: raw_data.clone(), + content_type: ContentType::PerfettoTrace, + }), + quantities: smallvec![], + }; + let (managed, _handle) = make_expanded(vec![chunk]); + let output = ProfileChunkOutput::Expanded(managed); + + let envelope = output + .serialize_envelope(Context::for_test().to_forward()) + .unwrap() + .accept(|e| e); + + let items: Vec<_> = envelope.items().collect(); + assert_eq!(items.len(), 1); + + let item = &items[0]; + let meta_length = item + .meta_length() + .expect("compound item must have meta_length"); + assert_eq!(meta_length as usize, json_payload.len()); + assert_eq!(item.content_type(), Some(ContentType::PerfettoTrace),); + + let payload = item.payload(); + let (json_part, raw_part) = payload.split_at(meta_length as usize); + assert_eq!(json_part, b"{\"expanded\":true}".as_ref()); + assert_eq!(raw_part, b"raw-binary-blob".as_ref()); } #[test] - fn test_split_compound_chunk() { - let meta = br#"{"content_type":"perfetto"}"#; - let body = b"binary-data"; - let item = make_compound_item(meta, body); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), meta.as_ref()); - assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + fn test_raw_profile_serialization() { + let raw = RawProfile { + payload: Bytes::from(b"binary-data".as_ref()), + content_type: ContentType::PerfettoTrace, + }; + let json = serde_json::to_value(&raw).unwrap(); + assert_eq!( + json["raw_profile_content_type"], + "application/x-perfetto-trace" + ); + assert!(json.get("raw_profile").is_some()); } #[test] - fn test_split_compound_empty_body() { - let meta = br#"{"content_type":"perfetto"}"#; - let item = make_compound_item(meta, b""); + fn test_serialize_envelope_mixed_json_and_compound() { + let json_chunk = ExpandedProfileChunk { + payload: Bytes::from(b"{\"type\":\"json\"}".as_ref()), + raw_profile: None, + quantities: smallvec![], + }; + let compound_chunk = ExpandedProfileChunk { + payload: Bytes::from(b"{\"type\":\"compound\"}".as_ref()), + raw_profile: Some(RawProfile { + payload: Bytes::from(b"perfetto-blob".as_ref()), + content_type: ContentType::PerfettoTrace, + }), + quantities: smallvec![], + }; + let (managed, _handle) = make_expanded(vec![json_chunk, compound_chunk]); + let output = ProfileChunkOutput::Expanded(managed); - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), meta.as_ref()); - assert!(raw.is_none()); - } + let envelope = output + .serialize_envelope(Context::for_test().to_forward()) + .unwrap() + .accept(|e| e); - #[test] - fn test_split_compound_meta_length_exceeds_payload() { - // meta_length is set to more bytes than the payload actually contains. - // split_at_checked returns None, so we fall back to the full payload with no split. - let body = b"binary-data"; - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); - item.set_meta_length(body.len() as u32 + 100); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), body.as_ref()); - assert!(raw.is_none()); - } + let items: Vec<_> = envelope.items().collect(); + assert_eq!(items.len(), 2); - #[test] - fn test_split_compound_zero_meta_length() { - // meta_length = 0: meta slice is empty, entire payload is treated as body. - let body = b"binary-data"; - let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); - item.set_meta_length(0); - - let (payload, raw) = split_item_payload(&item); - assert_eq!(payload.as_ref(), b""); - assert_eq!(raw.as_deref(), Some(b"binary-data".as_ref())); + assert_eq!(items[0].content_type(), Some(ContentType::Json)); + assert!(items[0].meta_length().is_none()); + + assert_eq!(items[1].content_type(), Some(ContentType::PerfettoTrace)); + assert!(items[1].meta_length().is_some()); } } diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index a99896b1074..83a929e14c9 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -1,144 +1,114 @@ use std::net::IpAddr; +use bytes::Bytes; +use smallvec::smallvec; + use relay_dynamic_config::Feature; use relay_profiling::ProfileType; use relay_quotas::DataCategory; -use crate::envelope::{ContentType, Item, ItemType}; +use crate::envelope::ContentType; +use crate::managed::{Quantities, RecordKeeper}; use crate::processing::Context; use crate::processing::Managed; -use crate::processing::profile_chunks::{Error, Result, SerializedProfileChunks}; +use crate::processing::profile_chunks::{ + Error, ExpandedProfileChunk, ExpandedProfileChunks, RawProfile, Result, SerializedProfileChunks, +}; use crate::statsd::RelayCounters; use crate::utils; -/// Processes profile chunks. -pub fn process(profile_chunks: &mut Managed, ctx: Context<'_>) { - // Only run this 'expensive' processing step in processing Relays. - if !ctx.is_processing() { - return; - } - - let sdk = utils::client_name_tag(profile_chunks.headers.meta().client_name()); - let client_ip = profile_chunks.headers.meta().client_addr(); - let filter_settings = &ctx.project_info.config.filter_settings; - - profile_chunks.retain( - |pc| &mut pc.profile_chunks, - |item, records| -> Result<()> { - if let Some(meta_length) = item.meta_length() { - return process_compound_item( - item, - meta_length, - sdk, - client_ip, - filter_settings, - ctx, - records, - ); - } - - let pc = relay_profiling::ProfileChunk::new(item.payload())?; - - // Validate the item inferred profile type with the one from the payload, - // or if missing set it. - // - // This is currently necessary to ensure profile chunks are emitted in the correct - // data category, as well as rate limited with the correct data category. - // - // In the future we plan to make the profile type on the item header a necessity. - // For more context see also: . - if item - .profile_type() - .is_some_and(|pt| pt != pc.profile_type()) - { - return Err(relay_profiling::ProfileError::InvalidProfileType.into()); - } - - // Update the profile type to ensure the following outcomes are emitted in the correct - // data category. - // - // Once the item header on the item is required, this is no longer required. - if item.profile_type().is_none() { - relay_statsd::metric!( - counter(RelayCounters::ProfileChunksWithoutPlatform) += 1, - sdk = sdk - ); - - item.set_platform(pc.platform().to_owned()); - debug_assert_eq!(item.profile_type(), Some(pc.profile_type())); - match pc.profile_type() { - ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), - ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), +/// Expands serialized profile chunk items into typed representations. +/// +/// Each item is individually parsed and validated. Items that fail are +/// removed with outcome tracking. +pub fn expand( + chunks: Managed, + ctx: Context<'_>, +) -> Managed { + chunks.map(|serialized, records| { + let sdk = utils::client_name_tag(serialized.headers.meta().client_name()); + let client_ip = serialized.headers.meta().client_addr(); + let filter_settings = &ctx.project_info.config.filter_settings; + + let mut expanded = Vec::with_capacity(serialized.profile_chunks.len()); + + for item in serialized.profile_chunks { + match expand_item(&item, sdk, client_ip, filter_settings, ctx, records) { + Ok(chunk) => expanded.push(chunk), + Err(err) => { + records.reject_err(err, &item); } } + } - pc.filter(client_ip, filter_settings, ctx.global_config)?; - - let expanded = pc.expand()?; - if expanded.len() > ctx.config.max_profile_size() { - return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); - } - - *item = { - let mut new_item = Item::new(ItemType::ProfileChunk); - new_item.set_platform(pc.platform().to_owned()); - new_item.set_payload(ContentType::Json, expanded); - new_item - }; - - Ok(()) - }, - ); + ExpandedProfileChunks { + headers: serialized.headers, + chunks: expanded, + } + }) } -/// Processes a compound profile chunk item (JSON metadata + binary blob). -/// -/// The item payload is `[JSON metadata bytes][binary blob bytes]`, split at `meta_length`. -/// After expansion, the item is rebuilt with `[expanded JSON][raw binary]` and an updated -/// `meta_length`, so that `forward_store` can still extract the raw profile. -fn process_compound_item( - item: &mut Item, - meta_length: u32, +fn expand_item( + item: &crate::envelope::Item, sdk: &str, client_ip: Option, filter_settings: &relay_filter::ProjectFiltersConfig, ctx: Context<'_>, - records: &mut crate::managed::RecordKeeper, -) -> Result<()> { + records: &mut RecordKeeper<'_>, +) -> Result { let payload = item.payload(); - let meta_length = meta_length as usize; - - let Some((meta_json, raw_profile)) = payload.split_at_checked(meta_length) else { - return Err(relay_profiling::ProfileError::InvalidSampledProfile.into()); - }; - - #[derive(serde::Deserialize)] - struct ContentTypeProbe { - content_type: Option, - } - match serde_json::from_slice::(meta_json) - .ok() - .and_then(|v| v.content_type) - .as_deref() - { - Some("perfetto") => {} - _ => return Err(relay_profiling::ProfileError::PlatformNotSupported.into()), - } + let is_perfetto = matches!(item.content_type(), Some(ContentType::PerfettoTrace)); - if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { - return Err(Error::FilterFeatureFlag); - } - - let expanded = relay_profiling::expand_perfetto(raw_profile, meta_json)?; - - if expanded.payload.len() > ctx.config.max_profile_size() { - return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); + if is_perfetto { + if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { + return Err(Error::FilterFeatureFlag); + } + let meta_length = + item.meta_length() + .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)? as usize; + let (json_payload, perfetto_payload) = payload + .split_at_checked(meta_length) + .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)?; + let expanded = relay_profiling::expand_perfetto(perfetto_payload, json_payload)?; + let quantities = validate_and_track(item, expanded.profile_type(), sdk, records)?; + expanded.filter(client_ip, filter_settings, ctx.global_config)?; + if expanded.payload.len() > ctx.config.max_profile_size() { + return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); + } + Ok(ExpandedProfileChunk { + payload: Bytes::from(expanded.payload), + raw_profile: Some(RawProfile { + payload: payload.slice_ref(perfetto_payload), + content_type: ContentType::PerfettoTrace, + }), + quantities, + }) + } else { + if item.meta_length().is_some() { + return Err(relay_profiling::ProfileError::InvalidSampledProfile.into()); + } + let pc = relay_profiling::ProfileChunk::new(payload)?; + let quantities = validate_and_track(item, pc.profile_type(), sdk, records)?; + pc.filter(client_ip, filter_settings, ctx.global_config)?; + let expanded = pc.expand()?; + if expanded.len() > ctx.config.max_profile_size() { + return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); + } + Ok(ExpandedProfileChunk { + payload: Bytes::from(expanded), + raw_profile: None, + quantities, + }) } +} - if item - .profile_type() - .is_some_and(|pt| pt != expanded.profile_type()) - { +fn validate_and_track( + item: &crate::envelope::Item, + profile_type: ProfileType, + sdk: &str, + records: &mut RecordKeeper<'_>, +) -> Result { + if item.profile_type().is_some_and(|pt| pt != profile_type) { return Err(relay_profiling::ProfileError::InvalidProfileType.into()); } @@ -147,31 +117,20 @@ fn process_compound_item( counter(RelayCounters::ProfileChunksWithoutPlatform) += 1, sdk = sdk ); - match expanded.profile_type() { + match profile_type { ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), } } - expanded.filter(client_ip, filter_settings, ctx.global_config)?; - - // Rebuild the compound payload: [expanded JSON][raw binary]. - // This preserves the raw profile for downstream extraction in forward_store. - let platform = expanded.platform; - let expanded_payload = bytes::Bytes::from(expanded.payload); - let mut compound = bytes::BytesMut::with_capacity(expanded_payload.len() + raw_profile.len()); - compound.extend_from_slice(&expanded_payload); - compound.extend_from_slice(raw_profile); - - *item = { - let mut new_item = Item::new(ItemType::ProfileChunk); - new_item.set_platform(platform); - new_item.set_payload(ContentType::Json, compound.freeze()); - new_item.set_meta_length(expanded_payload.len() as u32); - new_item - }; + Ok(compute_quantities(profile_type)) +} - Ok(()) +fn compute_quantities(profile_type: ProfileType) -> Quantities { + match profile_type { + ProfileType::Ui => smallvec![(DataCategory::ProfileChunkUi, 1)], + ProfileType::Backend => smallvec![(DataCategory::ProfileChunk, 1)], + } } #[cfg(test)] @@ -182,10 +141,8 @@ mod tests { use super::*; use crate::Envelope; - use crate::envelope::ContentType; + use crate::envelope::{ContentType, Item, ItemType}; use crate::extractors::RequestMeta; - use crate::managed::Managed; - use crate::processing::Context; use crate::processing::profile_chunks::SerializedProfileChunks; use crate::services::projects::project::ProjectInfo; @@ -212,7 +169,7 @@ mod tests { payload.extend_from_slice(meta); payload.extend_from_slice(body); let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, payload.freeze()); + item.set_payload(ContentType::PerfettoTrace, payload.freeze()); item.set_meta_length(meta_length); item } @@ -235,92 +192,68 @@ mod tests { .build() } - /// Runs `process_compound_item` for the single item in `managed` and returns the - /// inner [`SerializedProfileChunks`] after processing, consuming the managed value. - fn run(managed: &mut Managed, ctx: Context<'_>) { - let sdk = ""; - let client_ip = None; - let filter_settings = Default::default(); - managed.retain( - |pc| &mut pc.profile_chunks, - |item, records| -> Result<()> { - let meta_length = item.meta_length().unwrap_or(0); - process_compound_item( - item, - meta_length, - sdk, - client_ip, - &filter_settings, - ctx, - records, - ) - }, - ); + fn expand_single( + managed: Managed, + ctx: Context<'_>, + ) -> Managed { + expand(managed, ctx) } #[test] - fn test_process_compound_unknown_content_type() { - // content_type is not "perfetto" → item is dropped immediately. - let meta = serde_json::json!({ - "version": "2", - "chunk_id": "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", - "profiler_id": "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb", - "platform": "android", - "content_type": "unknown", - "client_sdk": {"name": "sentry-android", "version": "1.0"}, - }) - .to_string() - .into_bytes(); - let item = make_compound_item(&meta, PERFETTO_FIXTURE); - let (mut managed, _handle) = make_chunks(vec![item]); - - run(&mut managed, Context::for_test()); + fn test_expand_compound_unknown_content_type() { + let meta = perfetto_meta(); + let meta_length = meta.len() as u32; + let mut payload = bytes::BytesMut::new(); + payload.extend_from_slice(&meta); + payload.extend_from_slice(PERFETTO_FIXTURE); + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::OctetStream, payload.freeze()); + item.set_meta_length(meta_length); + let (managed, _handle) = make_chunks(vec![item]); - let chunks = managed.accept(|c| c); - assert!(chunks.profile_chunks.is_empty(), "item should be dropped"); + let expanded = expand_single(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert!(chunks.chunks.is_empty(), "item should be dropped"); } #[test] - fn test_process_compound_feature_flag_disabled() { - // The ContinuousProfilingPerfetto feature is absent → item is dropped. - // Default Context::for_test() uses relay mode = Managed with an empty feature set. + fn test_expand_compound_feature_flag_disabled() { let meta = perfetto_meta(); let item = make_compound_item(&meta, PERFETTO_FIXTURE); - let (mut managed, _handle) = make_chunks(vec![item]); - - run(&mut managed, Context::for_test()); + let (managed, _handle) = make_chunks(vec![item]); - let chunks = managed.accept(|c| c); + let expanded = expand_single(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); assert!( - chunks.profile_chunks.is_empty(), + chunks.chunks.is_empty(), "item should be dropped when feature flag is absent" ); } #[test] - fn test_process_compound_meta_length_out_of_bounds() { - // meta_length header is larger than the actual payload → InvalidSampledProfile. + fn test_expand_compound_meta_length_out_of_bounds() { let body = b"some bytes"; let mut item = Item::new(ItemType::ProfileChunk); - item.set_payload(ContentType::OctetStream, bytes::Bytes::from(body.as_ref())); + item.set_payload( + ContentType::PerfettoTrace, + bytes::Bytes::from(body.as_ref()), + ); item.set_meta_length(body.len() as u32 + 100); - let (mut managed, _handle) = make_chunks(vec![item]); - - run(&mut managed, Context::for_test()); + let (managed, _handle) = make_chunks(vec![item]); - let chunks = managed.accept(|c| c); + let expanded = expand_single(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); assert!( - chunks.profile_chunks.is_empty(), + chunks.chunks.is_empty(), "item should be dropped on out-of-bounds meta_length" ); } #[test] - fn test_process_compound_success() { - // Happy path: valid Perfetto trace + feature enabled → compound payload rebuilt. + fn test_expand_compound_success() { let meta = perfetto_meta(); let item = make_compound_item(&meta, PERFETTO_FIXTURE); - let (mut managed, _handle) = make_chunks(vec![item]); + let (managed, _handle) = make_chunks(vec![item]); let ctx = Context { project_info: &ProjectInfo { @@ -336,28 +269,16 @@ mod tests { ..Context::for_test() }; - run(&mut managed, ctx); + let expanded = expand_single(managed, ctx); + let chunks = expanded.accept(|c| c); + assert_eq!(chunks.chunks.len(), 1, "item should be retained"); - let mut chunks = managed.accept(|c| c); - assert_eq!(chunks.profile_chunks.len(), 1, "item should be retained"); - - let item = chunks.profile_chunks.remove(0); - - // The rebuilt item must carry a meta_length pointing to the expanded JSON. - let meta_length = item - .meta_length() - .expect("rebuilt item must have meta_length"); - assert!(meta_length > 0); - - // The first meta_length bytes must be valid JSON (the expanded Sample v2 profile). - let payload = item.payload(); - let (json_part, raw_part) = payload.split_at(meta_length as usize); + let chunk = &chunks.chunks[0]; assert!( - serde_json::from_slice::(json_part).is_ok(), - "first meta_length bytes must be valid JSON" + serde_json::from_slice::(&chunk.payload).is_ok(), + "payload must be valid JSON" ); - - // The raw binary is the original Perfetto trace preserved verbatim. - assert_eq!(raw_part, PERFETTO_FIXTURE); + let raw_profile = chunk.raw_profile.as_ref().expect("expected raw_profile"); + assert_eq!(raw_profile.payload.as_ref(), PERFETTO_FIXTURE); } } diff --git a/relay-server/src/processing/profile_chunks/store.rs b/relay-server/src/processing/profile_chunks/store.rs new file mode 100644 index 00000000000..09984214231 --- /dev/null +++ b/relay-server/src/processing/profile_chunks/store.rs @@ -0,0 +1,11 @@ +use crate::processing::profile_chunks::ExpandedProfileChunk; +use crate::services::store::StoreProfileChunk; + +pub fn convert(chunk: ExpandedProfileChunk, retention_days: u16) -> StoreProfileChunk { + StoreProfileChunk { + retention_days, + payload: chunk.payload, + quantities: chunk.quantities, + raw_profile: chunk.raw_profile, + } +} diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 2a7bda48a58..ecd1f2da26d 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -36,6 +36,7 @@ use relay_threading::AsyncPool; use crate::envelope::{AttachmentPlaceholder, AttachmentType, ContentType, Item, ItemType}; use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities, Rejected}; use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; +use crate::processing::profile_chunks::RawProfile; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; @@ -147,14 +148,6 @@ impl Counted for StoreSpanV2 { } } -/// Content type of a raw binary profile blob sent alongside the expanded JSON payload. -#[derive(Clone, Debug, Serialize)] -#[serde(rename_all = "lowercase")] -pub enum RawProfileContentType { - /// Perfetto binary trace format. - Perfetto, -} - /// Publishes a singular profile chunk to Kafka. #[derive(Debug)] pub struct StoreProfileChunk { @@ -170,9 +163,7 @@ pub struct StoreProfileChunk { /// /// Sent alongside the expanded JSON payload because the expansion only extracts a /// minimum of information; the raw profile is preserved for further processing downstream. - pub raw_profile: Option, - /// Content type of `raw_profile`. - pub raw_profile_content_type: Option, + pub raw_profile: Option, } impl Counted for StoreProfileChunk { @@ -864,7 +855,6 @@ impl StoreService { )]), payload: message.payload, raw_profile: message.raw_profile, - raw_profile_content_type: message.raw_profile_content_type, }; self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message)) @@ -1705,10 +1695,8 @@ struct ProfileChunkKafkaMessage { #[serde(skip)] headers: BTreeMap, payload: Bytes, - #[serde(skip_serializing_if = "Option::is_none")] - raw_profile: Option, - #[serde(skip_serializing_if = "Option::is_none")] - raw_profile_content_type: Option, + #[serde(flatten, skip_serializing_if = "Option::is_none")] + raw_profile: Option, } /// An enum over all possible ingest messages. @@ -1935,6 +1923,48 @@ mod tests { use super::*; + #[test] + fn test_profile_chunk_kafka_message_without_raw_profile() { + let message = ProfileChunkKafkaMessage { + organization_id: OrganizationId::new(1), + project_id: ProjectId::new(42), + received: 1234567890, + retention_days: 90, + headers: BTreeMap::new(), + payload: Bytes::from(b"{\"profile\":true}".as_ref()), + raw_profile: None, + }; + let json = serde_json::to_value(&message).unwrap(); + assert_eq!(json["organization_id"], 1); + assert_eq!(json["project_id"], 42); + assert!(json.get("raw_profile").is_none()); + assert!(json.get("raw_profile_content_type").is_none()); + } + + #[test] + fn test_profile_chunk_kafka_message_with_raw_profile() { + let message = ProfileChunkKafkaMessage { + organization_id: OrganizationId::new(1), + project_id: ProjectId::new(42), + received: 1234567890, + retention_days: 90, + headers: BTreeMap::new(), + payload: Bytes::from(b"{\"profile\":true}".as_ref()), + raw_profile: Some(RawProfile { + payload: Bytes::from(b"perfetto-binary-data".as_ref()), + content_type: crate::envelope::ContentType::PerfettoTrace, + }), + }; + let json = serde_json::to_value(&message).unwrap(); + assert_eq!(json["organization_id"], 1); + assert_eq!(json["project_id"], 42); + assert!(json.get("raw_profile").is_some()); + assert_eq!( + json["raw_profile_content_type"], + "application/x-perfetto-trace" + ); + } + #[test] fn disallow_outcomes() { struct TestMessage; diff --git a/tests/integration/test_profile_chunks_perfetto.py b/tests/integration/test_profile_chunks_perfetto.py index 2ce715589a9..f80fa950c7a 100644 --- a/tests/integration/test_profile_chunks_perfetto.py +++ b/tests/integration/test_profile_chunks_perfetto.py @@ -112,3 +112,7 @@ def test_perfetto_profile_chunk_end_to_end( for tid, meta in thread_metadata.items(): assert isinstance(tid, str) assert "name" in meta and isinstance(meta["name"], str) + + assert "raw_profile" in profile, "expected raw_profile in Kafka message" + assert len(profile["raw_profile"]) == 97252, "raw_profile size mismatch" + assert profile.get("raw_profile_content_type") == "application/x-perfetto-trace" From 4108a6ec6f02a13018165a96365b12009b8256e9 Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Tue, 5 May 2026 10:50:29 +0200 Subject: [PATCH 2/8] Address PR feedback --- .../src/processing/profile_chunks/mod.rs | 14 +++-- .../src/processing/profile_chunks/process.rs | 54 ++++++++++++++++--- .../src/processing/profile_chunks/store.rs | 11 ---- 3 files changed, 58 insertions(+), 21 deletions(-) delete mode 100644 relay-server/src/processing/profile_chunks/store.rs diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index cc270a7cd70..e11c37a77b7 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -15,8 +15,6 @@ use crate::services::outcome::{DiscardReason, Outcome}; mod filter; mod process; -#[cfg(feature = "processing")] -mod store; pub type Result = std::result::Result; @@ -152,6 +150,7 @@ impl processing::Processor for ProfileChunksProcessor { filter::feature_flag(ctx).reject(&profile_chunks)?; if !ctx.is_processing() { + let profile_chunks = self.limiter.enforce_quotas(profile_chunks, ctx).await?; return Ok(Output::just(ProfileChunkOutput::Serialized(profile_chunks))); } @@ -200,7 +199,7 @@ impl Forward for ProfileChunkOutput { item }) .collect(); - Envelope::from_parts(e.headers, Items::from_vec(items)) + Envelope::from_parts(e.headers, items) })), } } @@ -211,6 +210,8 @@ impl Forward for ProfileChunkOutput { s: processing::forward::StoreHandle<'_>, ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { + use crate::services::store::StoreProfileChunk; + let expanded = match self { Self::Expanded(e) => e, Self::Serialized(m) => { @@ -222,7 +223,12 @@ impl Forward for ProfileChunkOutput { let retention_days = ctx.event_retention().standard; for chunk in expanded.split(|e| e.chunks) { - s.send_to_store(chunk.map(|chunk, _| store::convert(chunk, retention_days))); + s.send_to_store(chunk.map(|chunk, _| StoreProfileChunk { + retention_days, + payload: chunk.payload, + quantities: chunk.quantities, + raw_profile: chunk.raw_profile, + })); } Ok(()) diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index 83a929e14c9..3f0895bffe1 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -63,6 +63,9 @@ fn expand_item( if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { return Err(Error::FilterFeatureFlag); } + let profile_type = item + .profile_type() + .ok_or(relay_profiling::ProfileError::InvalidProfileType)?; let meta_length = item.meta_length() .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)? as usize; @@ -70,7 +73,10 @@ fn expand_item( .split_at_checked(meta_length) .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)?; let expanded = relay_profiling::expand_perfetto(perfetto_payload, json_payload)?; - let quantities = validate_and_track(item, expanded.profile_type(), sdk, records)?; + if expanded.profile_type() != profile_type { + return Err(relay_profiling::ProfileError::InvalidProfileType.into()); + } + let quantities = quantities_for(profile_type); expanded.filter(client_ip, filter_settings, ctx.global_config)?; if expanded.payload.len() > ctx.config.max_profile_size() { return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); @@ -123,10 +129,10 @@ fn validate_and_track( } } - Ok(compute_quantities(profile_type)) + Ok(quantities_for(profile_type)) } -fn compute_quantities(profile_type: ProfileType) -> Quantities { +fn quantities_for(profile_type: ProfileType) -> Quantities { match profile_type { ProfileType::Ui => smallvec![(DataCategory::ProfileChunkUi, 1)], ProfileType::Backend => smallvec![(DataCategory::ProfileChunk, 1)], @@ -163,7 +169,7 @@ mod tests { .into_bytes() } - fn make_compound_item(meta: &[u8], body: &[u8]) -> Item { + fn make_compound_item(meta: &[u8], body: &[u8], platform: &str) -> Item { let meta_length = meta.len() as u32; let mut payload = bytes::BytesMut::new(); payload.extend_from_slice(meta); @@ -171,6 +177,7 @@ mod tests { let mut item = Item::new(ItemType::ProfileChunk); item.set_payload(ContentType::PerfettoTrace, payload.freeze()); item.set_meta_length(meta_length); + item.set_platform(platform.to_owned()); item } @@ -219,7 +226,7 @@ mod tests { #[test] fn test_expand_compound_feature_flag_disabled() { let meta = perfetto_meta(); - let item = make_compound_item(&meta, PERFETTO_FIXTURE); + let item = make_compound_item(&meta, PERFETTO_FIXTURE, "android"); let (managed, _handle) = make_chunks(vec![item]); let expanded = expand_single(managed, Context::for_test()); @@ -239,6 +246,7 @@ mod tests { bytes::Bytes::from(body.as_ref()), ); item.set_meta_length(body.len() as u32 + 100); + item.set_platform("android".to_owned()); let (managed, _handle) = make_chunks(vec![item]); let expanded = expand_single(managed, Context::for_test()); @@ -249,10 +257,44 @@ mod tests { ); } + #[test] + fn test_expand_compound_missing_platform() { + let meta = perfetto_meta(); + let meta_length = meta.len() as u32; + let mut payload = bytes::BytesMut::new(); + payload.extend_from_slice(&meta); + payload.extend_from_slice(PERFETTO_FIXTURE); + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::PerfettoTrace, payload.freeze()); + item.set_meta_length(meta_length); + let (managed, _handle) = make_chunks(vec![item]); + + let ctx = Context { + project_info: &ProjectInfo { + config: ProjectConfig { + features: FeatureSet::from_iter([ + Feature::ContinuousProfiling, + Feature::ContinuousProfilingPerfetto, + ]), + ..Default::default() + }, + ..Default::default() + }, + ..Context::for_test() + }; + + let expanded = expand_single(managed, ctx); + let chunks = expanded.accept(|c| c); + assert!( + chunks.chunks.is_empty(), + "perfetto item without platform header should be rejected" + ); + } + #[test] fn test_expand_compound_success() { let meta = perfetto_meta(); - let item = make_compound_item(&meta, PERFETTO_FIXTURE); + let item = make_compound_item(&meta, PERFETTO_FIXTURE, "android"); let (managed, _handle) = make_chunks(vec![item]); let ctx = Context { diff --git a/relay-server/src/processing/profile_chunks/store.rs b/relay-server/src/processing/profile_chunks/store.rs deleted file mode 100644 index 09984214231..00000000000 --- a/relay-server/src/processing/profile_chunks/store.rs +++ /dev/null @@ -1,11 +0,0 @@ -use crate::processing::profile_chunks::ExpandedProfileChunk; -use crate::services::store::StoreProfileChunk; - -pub fn convert(chunk: ExpandedProfileChunk, retention_days: u16) -> StoreProfileChunk { - StoreProfileChunk { - retention_days, - payload: chunk.payload, - quantities: chunk.quantities, - raw_profile: chunk.raw_profile, - } -} From 67cd15920fd28ff1ced1e085f4fb6d6a2e79f2bf Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Wed, 6 May 2026 09:44:43 +0200 Subject: [PATCH 3/8] Flatten ProfileChunkKafkaMessage --- .../src/processing/profile_chunks/mod.rs | 19 +------------------ relay-server/src/services/store.rs | 16 +++++++++------- 2 files changed, 10 insertions(+), 25 deletions(-) diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index e11c37a77b7..0ed54eb9cde 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use bytes::Bytes; -use serde::Serialize; use smallvec::smallvec; use relay_profiling::ProfileType; @@ -59,11 +58,9 @@ impl crate::managed::OutcomeError for Error { } } -#[derive(Clone, Debug, Serialize)] +#[derive(Debug)] pub struct RawProfile { - #[serde(rename = "raw_profile")] pub payload: Bytes, - #[serde(rename = "raw_profile_content_type")] pub content_type: ContentType, } @@ -355,20 +352,6 @@ mod tests { assert_eq!(raw_part, b"raw-binary-blob".as_ref()); } - #[test] - fn test_raw_profile_serialization() { - let raw = RawProfile { - payload: Bytes::from(b"binary-data".as_ref()), - content_type: ContentType::PerfettoTrace, - }; - let json = serde_json::to_value(&raw).unwrap(); - assert_eq!( - json["raw_profile_content_type"], - "application/x-perfetto-trace" - ); - assert!(json.get("raw_profile").is_some()); - } - #[test] fn test_serialize_envelope_mixed_json_and_compound() { let json_chunk = ExpandedProfileChunk { diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index ecd1f2da26d..1bf6cd7ea92 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -854,7 +854,8 @@ impl StoreService { scoping.project_id.to_string(), )]), payload: message.payload, - raw_profile: message.raw_profile, + raw_profile: message.raw_profile.as_ref().map(|r| r.payload.clone()), + raw_profile_content_type: message.raw_profile.map(|r| r.content_type), }; self.produce(KafkaTopic::Profiles, KafkaMessage::ProfileChunk(message)) @@ -1695,8 +1696,10 @@ struct ProfileChunkKafkaMessage { #[serde(skip)] headers: BTreeMap, payload: Bytes, - #[serde(flatten, skip_serializing_if = "Option::is_none")] - raw_profile: Option, + #[serde(skip_serializing_if = "Option::is_none")] + raw_profile: Option, + #[serde(skip_serializing_if = "Option::is_none")] + raw_profile_content_type: Option, } /// An enum over all possible ingest messages. @@ -1933,6 +1936,7 @@ mod tests { headers: BTreeMap::new(), payload: Bytes::from(b"{\"profile\":true}".as_ref()), raw_profile: None, + raw_profile_content_type: None, }; let json = serde_json::to_value(&message).unwrap(); assert_eq!(json["organization_id"], 1); @@ -1950,10 +1954,8 @@ mod tests { retention_days: 90, headers: BTreeMap::new(), payload: Bytes::from(b"{\"profile\":true}".as_ref()), - raw_profile: Some(RawProfile { - payload: Bytes::from(b"perfetto-binary-data".as_ref()), - content_type: crate::envelope::ContentType::PerfettoTrace, - }), + raw_profile: Some(Bytes::from(b"perfetto-binary-data".as_ref())), + raw_profile_content_type: Some(crate::envelope::ContentType::PerfettoTrace), }; let json = serde_json::to_value(&message).unwrap(); assert_eq!(json["organization_id"], 1); From 8de85af36eb99f5f2b4c76eef251af0931bbfb19 Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Wed, 6 May 2026 09:45:21 +0200 Subject: [PATCH 4/8] Ensure platform is correctly populated --- .../src/processing/profile_chunks/mod.rs | 17 +++++++++++++---- .../src/processing/profile_chunks/process.rs | 3 +++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index 0ed54eb9cde..277ff5313bb 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -69,6 +69,7 @@ pub struct RawProfile { pub struct ExpandedProfileChunk { pub payload: Bytes, pub raw_profile: Option, + pub platform: String, pub quantities: Quantities, } @@ -181,6 +182,7 @@ impl Forward for ProfileChunkOutput { .into_iter() .map(|chunk| { let mut item = Item::new(ItemType::ProfileChunk); + item.set_platform(chunk.platform); if let Some(raw_profile) = chunk.raw_profile { let meta_length = chunk.payload.len() as u32; let mut compound = bytes::BytesMut::with_capacity( @@ -299,7 +301,8 @@ mod tests { let chunk = ExpandedProfileChunk { payload: Bytes::from(b"{\"hello\":\"world\"}".as_ref()), raw_profile: None, - quantities: smallvec![], + platform: "python".to_owned(), + quantities: smallvec![(DataCategory::ProfileChunk, 1)], }; let (managed, _handle) = make_expanded(vec![chunk]); let output = ProfileChunkOutput::Expanded(managed); @@ -314,6 +317,7 @@ mod tests { assert_eq!(items[0].payload().as_ref(), b"{\"hello\":\"world\"}"); assert!(items[0].meta_length().is_none()); assert_eq!(items[0].content_type(), Some(ContentType::Json)); + assert_eq!(items[0].platform(), Some("python")); } #[test] @@ -326,7 +330,8 @@ mod tests { payload: raw_data.clone(), content_type: ContentType::PerfettoTrace, }), - quantities: smallvec![], + platform: "android".to_owned(), + quantities: smallvec![(DataCategory::ProfileChunkUi, 1)], }; let (managed, _handle) = make_expanded(vec![chunk]); let output = ProfileChunkOutput::Expanded(managed); @@ -357,7 +362,8 @@ mod tests { let json_chunk = ExpandedProfileChunk { payload: Bytes::from(b"{\"type\":\"json\"}".as_ref()), raw_profile: None, - quantities: smallvec![], + platform: "python".to_owned(), + quantities: smallvec![(DataCategory::ProfileChunk, 1)], }; let compound_chunk = ExpandedProfileChunk { payload: Bytes::from(b"{\"type\":\"compound\"}".as_ref()), @@ -365,7 +371,8 @@ mod tests { payload: Bytes::from(b"perfetto-blob".as_ref()), content_type: ContentType::PerfettoTrace, }), - quantities: smallvec![], + platform: "android".to_owned(), + quantities: smallvec![(DataCategory::ProfileChunkUi, 1)], }; let (managed, _handle) = make_expanded(vec![json_chunk, compound_chunk]); let output = ProfileChunkOutput::Expanded(managed); @@ -380,8 +387,10 @@ mod tests { assert_eq!(items[0].content_type(), Some(ContentType::Json)); assert!(items[0].meta_length().is_none()); + assert_eq!(items[0].platform(), Some("python")); assert_eq!(items[1].content_type(), Some(ContentType::PerfettoTrace)); assert!(items[1].meta_length().is_some()); + assert_eq!(items[1].platform(), Some("android")); } } diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index 3f0895bffe1..e1295633a0e 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -87,6 +87,7 @@ fn expand_item( payload: payload.slice_ref(perfetto_payload), content_type: ContentType::PerfettoTrace, }), + platform: expanded.platform, quantities, }) } else { @@ -95,6 +96,7 @@ fn expand_item( } let pc = relay_profiling::ProfileChunk::new(payload)?; let quantities = validate_and_track(item, pc.profile_type(), sdk, records)?; + let platform = pc.platform().to_owned(); pc.filter(client_ip, filter_settings, ctx.global_config)?; let expanded = pc.expand()?; if expanded.len() > ctx.config.max_profile_size() { @@ -103,6 +105,7 @@ fn expand_item( Ok(ExpandedProfileChunk { payload: Bytes::from(expanded), raw_profile: None, + platform, quantities, }) } From e43f1a3d331f7e651823d59034bd0bdfaa8c3c34 Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Thu, 7 May 2026 08:08:42 +0200 Subject: [PATCH 5/8] Validate platform for perfetto profiles --- relay-server/src/processing/profile_chunks/process.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index e1295633a0e..1f9cd6e2218 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -63,6 +63,11 @@ fn expand_item( if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { return Err(Error::FilterFeatureFlag); } + let platform = item + .platform() + .ok_or(relay_profiling::ProfileError::PlatformNotSupported)? + .to_owned(); + let profile_type = item .profile_type() .ok_or(relay_profiling::ProfileError::InvalidProfileType)?; @@ -87,7 +92,7 @@ fn expand_item( payload: payload.slice_ref(perfetto_payload), content_type: ContentType::PerfettoTrace, }), - platform: expanded.platform, + platform, quantities, }) } else { From 87b33fd547db4a7242439a2354272fb0476233cd Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Thu, 7 May 2026 15:33:16 +0200 Subject: [PATCH 6/8] Remove unused fields ExpandedProfileChunks and ExpandedProfileChunk, add missing tests As ProfileChunkOutput::Expanded is only used in processing mode, there's no need to carry around the headers / platform fields. - added platform validation for perfetto profiles - added test for existing JSON-only profiles, ensuring no change in behavior - refactored validation / quantities handling to be more re-usable across profile formats --- .../src/processing/profile_chunks/mod.rs | 137 +--------- .../src/processing/profile_chunks/process.rs | 245 +++++++++++++----- 2 files changed, 186 insertions(+), 196 deletions(-) diff --git a/relay-server/src/processing/profile_chunks/mod.rs b/relay-server/src/processing/profile_chunks/mod.rs index 277ff5313bb..81aeccfd842 100644 --- a/relay-server/src/processing/profile_chunks/mod.rs +++ b/relay-server/src/processing/profile_chunks/mod.rs @@ -59,6 +59,7 @@ impl crate::managed::OutcomeError for Error { } #[derive(Debug)] +#[cfg_attr(all(not(feature = "processing"), not(test)), expect(dead_code))] pub struct RawProfile { pub payload: Bytes, pub content_type: ContentType, @@ -66,10 +67,10 @@ pub struct RawProfile { /// A single profile chunk after expansion. #[derive(Debug)] +#[cfg_attr(all(not(feature = "processing"), not(test)), expect(dead_code))] pub struct ExpandedProfileChunk { pub payload: Bytes, pub raw_profile: Option, - pub platform: String, pub quantities: Quantities, } @@ -83,7 +84,6 @@ impl Counted for ExpandedProfileChunk { /// converted into typed representations. #[derive(Debug)] pub struct ExpandedProfileChunks { - pub headers: EnvelopeHeaders, pub chunks: Vec, } @@ -161,6 +161,10 @@ impl processing::Processor for ProfileChunksProcessor { /// Output produced by [`ProfileChunksProcessor`]. #[derive(Debug)] +#[expect( + clippy::large_enum_variant, + reason = "variants are sized by Managed which wraps different pipeline stages" +)] pub enum ProfileChunkOutput { /// Non-processing relay: items forwarded as-is. Serialized(Managed), @@ -176,30 +180,9 @@ impl Forward for ProfileChunkOutput { match self { Self::Serialized(profile_chunks) => Ok(profile_chunks .map(|pc, _| Envelope::from_parts(pc.headers, Items::from_vec(pc.profile_chunks)))), - Self::Expanded(expanded) => Ok(expanded.map(|e, _| { - let items = e - .chunks - .into_iter() - .map(|chunk| { - let mut item = Item::new(ItemType::ProfileChunk); - item.set_platform(chunk.platform); - if let Some(raw_profile) = chunk.raw_profile { - let meta_length = chunk.payload.len() as u32; - let mut compound = bytes::BytesMut::with_capacity( - chunk.payload.len() + raw_profile.payload.len(), - ); - compound.extend_from_slice(&chunk.payload); - compound.extend_from_slice(&raw_profile.payload); - item.set_payload(raw_profile.content_type, compound.freeze()); - item.set_meta_length(meta_length); - } else { - item.set_payload(ContentType::Json, chunk.payload); - } - item - }) - .collect(); - Envelope::from_parts(e.headers, items) - })), + Self::Expanded(m) => { + Err(m.internal_error("serialize_envelope called with expanded profile chunks")) + } } } @@ -274,12 +257,7 @@ impl CountRateLimited for Managed { #[cfg(test)] mod tests { - use similar_asserts::assert_eq; - use super::*; - use crate::Envelope; - use crate::envelope::ContentType; - use crate::extractors::RequestMeta; use crate::processing::Context; fn make_expanded( @@ -288,109 +266,20 @@ mod tests { Managed, crate::managed::ManagedTestHandle, ) { - let dsn = "https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42" - .parse() - .unwrap(); - let envelope = Envelope::from_request(None, RequestMeta::new(dsn)); - let headers = envelope.headers().clone(); - Managed::for_test(ExpandedProfileChunks { headers, chunks }).build() + Managed::for_test(ExpandedProfileChunks { chunks }).build() } #[test] - fn test_serialize_envelope_json_only() { + #[should_panic(expected = "serialize_envelope called with expanded profile chunks")] + fn test_serialize_envelope_rejects_expanded() { let chunk = ExpandedProfileChunk { payload: Bytes::from(b"{\"hello\":\"world\"}".as_ref()), raw_profile: None, - platform: "python".to_owned(), quantities: smallvec![(DataCategory::ProfileChunk, 1)], }; let (managed, _handle) = make_expanded(vec![chunk]); let output = ProfileChunkOutput::Expanded(managed); - let envelope = output - .serialize_envelope(Context::for_test().to_forward()) - .unwrap() - .accept(|e| e); - - let items: Vec<_> = envelope.items().collect(); - assert_eq!(items.len(), 1); - assert_eq!(items[0].payload().as_ref(), b"{\"hello\":\"world\"}"); - assert!(items[0].meta_length().is_none()); - assert_eq!(items[0].content_type(), Some(ContentType::Json)); - assert_eq!(items[0].platform(), Some("python")); - } - - #[test] - fn test_serialize_envelope_compound() { - let json_payload = Bytes::from(b"{\"expanded\":true}".as_ref()); - let raw_data = Bytes::from(b"raw-binary-blob".as_ref()); - let chunk = ExpandedProfileChunk { - payload: json_payload.clone(), - raw_profile: Some(RawProfile { - payload: raw_data.clone(), - content_type: ContentType::PerfettoTrace, - }), - platform: "android".to_owned(), - quantities: smallvec![(DataCategory::ProfileChunkUi, 1)], - }; - let (managed, _handle) = make_expanded(vec![chunk]); - let output = ProfileChunkOutput::Expanded(managed); - - let envelope = output - .serialize_envelope(Context::for_test().to_forward()) - .unwrap() - .accept(|e| e); - - let items: Vec<_> = envelope.items().collect(); - assert_eq!(items.len(), 1); - - let item = &items[0]; - let meta_length = item - .meta_length() - .expect("compound item must have meta_length"); - assert_eq!(meta_length as usize, json_payload.len()); - assert_eq!(item.content_type(), Some(ContentType::PerfettoTrace),); - - let payload = item.payload(); - let (json_part, raw_part) = payload.split_at(meta_length as usize); - assert_eq!(json_part, b"{\"expanded\":true}".as_ref()); - assert_eq!(raw_part, b"raw-binary-blob".as_ref()); - } - - #[test] - fn test_serialize_envelope_mixed_json_and_compound() { - let json_chunk = ExpandedProfileChunk { - payload: Bytes::from(b"{\"type\":\"json\"}".as_ref()), - raw_profile: None, - platform: "python".to_owned(), - quantities: smallvec![(DataCategory::ProfileChunk, 1)], - }; - let compound_chunk = ExpandedProfileChunk { - payload: Bytes::from(b"{\"type\":\"compound\"}".as_ref()), - raw_profile: Some(RawProfile { - payload: Bytes::from(b"perfetto-blob".as_ref()), - content_type: ContentType::PerfettoTrace, - }), - platform: "android".to_owned(), - quantities: smallvec![(DataCategory::ProfileChunkUi, 1)], - }; - let (managed, _handle) = make_expanded(vec![json_chunk, compound_chunk]); - let output = ProfileChunkOutput::Expanded(managed); - - let envelope = output - .serialize_envelope(Context::for_test().to_forward()) - .unwrap() - .accept(|e| e); - - let items: Vec<_> = envelope.items().collect(); - assert_eq!(items.len(), 2); - - assert_eq!(items[0].content_type(), Some(ContentType::Json)); - assert!(items[0].meta_length().is_none()); - assert_eq!(items[0].platform(), Some("python")); - - assert_eq!(items[1].content_type(), Some(ContentType::PerfettoTrace)); - assert!(items[1].meta_length().is_some()); - assert_eq!(items[1].platform(), Some("android")); + let _ = output.serialize_envelope(Context::for_test().to_forward()); } } diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index 1f9cd6e2218..9cb0dacc376 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -33,111 +33,146 @@ pub fn expand( let mut expanded = Vec::with_capacity(serialized.profile_chunks.len()); for item in serialized.profile_chunks { - match expand_item(&item, sdk, client_ip, filter_settings, ctx, records) { - Ok(chunk) => expanded.push(chunk), - Err(err) => { - records.reject_err(err, &item); + match expand_item(&item, client_ip, filter_settings, ctx) { + Ok(chunk) => { + track_quantities(&item, sdk, &chunk.quantities, records); + expanded.push(chunk); + } + Err((err, quantities)) => { + track_quantities(&item, sdk, &quantities, records); + records.reject_err(err, quantities); } } } - ExpandedProfileChunks { - headers: serialized.headers, - chunks: expanded, - } + ExpandedProfileChunks { chunks: expanded } }) } fn expand_item( item: &crate::envelope::Item, - sdk: &str, client_ip: Option, filter_settings: &relay_filter::ProjectFiltersConfig, ctx: Context<'_>, - records: &mut RecordKeeper<'_>, -) -> Result { +) -> std::result::Result { let payload = item.payload(); let is_perfetto = matches!(item.content_type(), Some(ContentType::PerfettoTrace)); if is_perfetto { - if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { - return Err(Error::FilterFeatureFlag); - } - let platform = item - .platform() - .ok_or(relay_profiling::ProfileError::PlatformNotSupported)? - .to_owned(); - - let profile_type = item - .profile_type() - .ok_or(relay_profiling::ProfileError::InvalidProfileType)?; - let meta_length = - item.meta_length() - .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)? as usize; - let (json_payload, perfetto_payload) = payload - .split_at_checked(meta_length) - .ok_or(relay_profiling::ProfileError::InvalidSampledProfile)?; - let expanded = relay_profiling::expand_perfetto(perfetto_payload, json_payload)?; - if expanded.profile_type() != profile_type { - return Err(relay_profiling::ProfileError::InvalidProfileType.into()); - } - let quantities = quantities_for(profile_type); - expanded.filter(client_ip, filter_settings, ctx.global_config)?; - if expanded.payload.len() > ctx.config.max_profile_size() { - return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); - } - Ok(ExpandedProfileChunk { - payload: Bytes::from(expanded.payload), - raw_profile: Some(RawProfile { - payload: payload.slice_ref(perfetto_payload), - content_type: ContentType::PerfettoTrace, - }), - platform, - quantities, - }) + expand_perfetto_profile_chunk(item, client_ip, filter_settings, ctx, payload) } else { - if item.meta_length().is_some() { - return Err(relay_profiling::ProfileError::InvalidSampledProfile.into()); - } - let pc = relay_profiling::ProfileChunk::new(payload)?; - let quantities = validate_and_track(item, pc.profile_type(), sdk, records)?; - let platform = pc.platform().to_owned(); - pc.filter(client_ip, filter_settings, ctx.global_config)?; - let expanded = pc.expand()?; - if expanded.len() > ctx.config.max_profile_size() { - return Err(relay_profiling::ProfileError::ExceedSizeLimit.into()); - } - Ok(ExpandedProfileChunk { - payload: Bytes::from(expanded), - raw_profile: None, - platform, + expand_json_item(item, client_ip, filter_settings, ctx, payload) + } +} + +fn expand_perfetto_profile_chunk( + item: &crate::envelope::Item, + client_ip: Option, + filter_settings: &relay_filter::ProjectFiltersConfig, + ctx: Context<'_>, + payload: Bytes, +) -> std::result::Result { + let item_quantities = item.quantities(); + let err = |e: Error| (e, item_quantities.clone()); + + if ctx.should_filter(Feature::ContinuousProfilingPerfetto) { + return Err(err(Error::FilterFeatureFlag)); + } + item.platform() + .ok_or_else(|| err(relay_profiling::ProfileError::PlatformNotSupported.into()))?; + + let profile_type = item + .profile_type() + .ok_or_else(|| err(relay_profiling::ProfileError::InvalidProfileType.into()))?; + let meta_length = item + .meta_length() + .ok_or_else(|| err(relay_profiling::ProfileError::InvalidSampledProfile.into()))? + as usize; + let (json_payload, perfetto_payload) = payload + .split_at_checked(meta_length) + .ok_or_else(|| err(relay_profiling::ProfileError::InvalidSampledProfile.into()))?; + let expanded = relay_profiling::expand_perfetto(perfetto_payload, json_payload) + .map_err(|e| err(e.into()))?; + if expanded.profile_type() != profile_type { + return Err(err(relay_profiling::ProfileError::InvalidProfileType.into())); + } + let quantities = quantities_for(profile_type); + expanded + .filter(client_ip, filter_settings, ctx.global_config) + .map_err(|e| (e.into(), quantities.clone()))?; + if expanded.payload.len() > ctx.config.max_profile_size() { + return Err(( + relay_profiling::ProfileError::ExceedSizeLimit.into(), quantities, - }) + )); } + Ok(ExpandedProfileChunk { + payload: Bytes::from(expanded.payload), + raw_profile: Some(RawProfile { + payload: payload.slice_ref(perfetto_payload), + content_type: ContentType::PerfettoTrace, + }), + quantities, + }) } -fn validate_and_track( +fn expand_json_item( item: &crate::envelope::Item, - profile_type: ProfileType, - sdk: &str, - records: &mut RecordKeeper<'_>, -) -> Result { + client_ip: Option, + filter_settings: &relay_filter::ProjectFiltersConfig, + ctx: Context<'_>, + payload: Bytes, +) -> std::result::Result { + let item_quantities = item.quantities(); + let err = |e: Error| (e, item_quantities.clone()); + + if item.meta_length().is_some() { + return Err(err( + relay_profiling::ProfileError::InvalidSampledProfile.into() + )); + } + let pc = relay_profiling::ProfileChunk::new(payload).map_err(|e| err(e.into()))?; + let profile_type = pc.profile_type(); + validate_profile_type(item, profile_type).map_err(err)?; + let quantities = quantities_for(profile_type); + pc.filter(client_ip, filter_settings, ctx.global_config) + .map_err(|e| (e.into(), quantities.clone()))?; + let expanded = pc.expand().map_err(|e| (e.into(), quantities.clone()))?; + if expanded.len() > ctx.config.max_profile_size() { + return Err(( + relay_profiling::ProfileError::ExceedSizeLimit.into(), + quantities, + )); + } + Ok(ExpandedProfileChunk { + payload: Bytes::from(expanded), + raw_profile: None, + quantities, + }) +} + +fn validate_profile_type(item: &crate::envelope::Item, profile_type: ProfileType) -> Result<()> { if item.profile_type().is_some_and(|pt| pt != profile_type) { return Err(relay_profiling::ProfileError::InvalidProfileType.into()); } + Ok(()) +} +fn track_quantities( + item: &crate::envelope::Item, + sdk: &str, + quantities: &Quantities, + records: &mut RecordKeeper<'_>, +) { if item.profile_type().is_none() { relay_statsd::metric!( counter(RelayCounters::ProfileChunksWithoutPlatform) += 1, sdk = sdk ); - match profile_type { - ProfileType::Ui => records.modify_by(DataCategory::ProfileChunkUi, 1), - ProfileType::Backend => records.modify_by(DataCategory::ProfileChunk, 1), + for &(category, quantity) in quantities { + records.modify_by(category, quantity as isize); } } - - Ok(quantities_for(profile_type)) } fn quantities_for(profile_type: ProfileType) -> Quantities { @@ -158,12 +193,16 @@ mod tests { use crate::envelope::{ContentType, Item, ItemType}; use crate::extractors::RequestMeta; use crate::processing::profile_chunks::SerializedProfileChunks; + use crate::services::outcome::{DiscardReason, Outcome}; use crate::services::projects::project::ProjectInfo; const PERFETTO_FIXTURE: &[u8] = include_bytes!( "../../../../relay-profiling/tests/fixtures/android/perfetto/android.pftrace" ); + const JSON_FIXTURE: &[u8] = + include_bytes!("../../../../relay-profiling/tests/fixtures/sample/v2/valid.json"); + fn perfetto_meta() -> Vec { serde_json::json!({ "version": "2", @@ -330,5 +369,67 @@ mod tests { ); let raw_profile = chunk.raw_profile.as_ref().expect("expected raw_profile"); assert_eq!(raw_profile.payload.as_ref(), PERFETTO_FIXTURE); + assert_eq!(raw_profile.content_type, ContentType::PerfettoTrace); + } + + fn make_json_item(payload: &[u8]) -> Item { + let mut item = Item::new(ItemType::ProfileChunk); + item.set_payload(ContentType::Json, bytes::Bytes::from(payload.to_vec())); + item + } + + #[test] + fn test_expand_json_success() { + let item = make_json_item(JSON_FIXTURE); + let (managed, _handle) = make_chunks(vec![item]); + + let expanded = expand_single(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert_eq!(chunks.chunks.len(), 1, "item should be retained"); + + let chunk = &chunks.chunks[0]; + assert!( + serde_json::from_slice::(&chunk.payload).is_ok(), + "payload must be valid JSON" + ); + assert!( + chunk.raw_profile.is_none(), + "JSON items should not have raw_profile" + ); + } + + #[test] + fn test_expand_json_with_meta_length_rejected() { + let mut item = make_json_item(JSON_FIXTURE); + item.set_meta_length(10); + let (managed, _handle) = make_chunks(vec![item]); + + let expanded = expand_single(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert!( + chunks.chunks.is_empty(), + "JSON item with meta_length should be rejected" + ); + } + + #[test] + fn test_expand_json_mismatched_profile_type() { + let mut item = make_json_item(JSON_FIXTURE); + // fixture has platform "cocoa" → ProfileType::Ui, + // but "node" → ProfileType::Backend, creating a mismatch + item.set_platform("node".to_owned()); + let (managed, mut handle) = make_chunks(vec![item]); + + let expanded = expand_single(managed, Context::for_test()); + let chunks = expanded.accept(|c| c); + assert!( + chunks.chunks.is_empty(), + "JSON item with mismatched profile_type header should be rejected" + ); + handle.assert_outcome( + &Outcome::Invalid(DiscardReason::Profiling("profiling_invalid_profile_type")), + DataCategory::ProfileChunk, + 1, + ); } } From 51b5a83e5b9e27e49ab6f3e5a288cbd26720556f Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Thu, 7 May 2026 16:22:04 +0200 Subject: [PATCH 7/8] Inline expand_item --- .../src/processing/profile_chunks/process.rs | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index 9cb0dacc376..f65126df308 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -33,7 +33,16 @@ pub fn expand( let mut expanded = Vec::with_capacity(serialized.profile_chunks.len()); for item in serialized.profile_chunks { - match expand_item(&item, client_ip, filter_settings, ctx) { + let payload = item.payload(); + let is_perfetto = matches!(item.content_type(), Some(ContentType::PerfettoTrace)); + + let result = if is_perfetto { + expand_perfetto_profile_chunk(&item, client_ip, filter_settings, ctx, payload) + } else { + expand_json_item(&item, client_ip, filter_settings, ctx, payload) + }; + + match result { Ok(chunk) => { track_quantities(&item, sdk, &chunk.quantities, records); expanded.push(chunk); @@ -49,22 +58,6 @@ pub fn expand( }) } -fn expand_item( - item: &crate::envelope::Item, - client_ip: Option, - filter_settings: &relay_filter::ProjectFiltersConfig, - ctx: Context<'_>, -) -> std::result::Result { - let payload = item.payload(); - let is_perfetto = matches!(item.content_type(), Some(ContentType::PerfettoTrace)); - - if is_perfetto { - expand_perfetto_profile_chunk(item, client_ip, filter_settings, ctx, payload) - } else { - expand_json_item(item, client_ip, filter_settings, ctx, payload) - } -} - fn expand_perfetto_profile_chunk( item: &crate::envelope::Item, client_ip: Option, From 2858a572772e831d0dbcb9b5b20d3936f5948d23 Mon Sep 17 00:00:00 2001 From: Markus Hintersteiner Date: Thu, 7 May 2026 16:52:40 +0200 Subject: [PATCH 8/8] Extend tests, apply style feedback --- .../src/processing/profile_chunks/process.rs | 50 ++++++++++++------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/relay-server/src/processing/profile_chunks/process.rs b/relay-server/src/processing/profile_chunks/process.rs index f65126df308..f54e42d8e0c 100644 --- a/relay-server/src/processing/profile_chunks/process.rs +++ b/relay-server/src/processing/profile_chunks/process.rs @@ -64,7 +64,7 @@ fn expand_perfetto_profile_chunk( filter_settings: &relay_filter::ProjectFiltersConfig, ctx: Context<'_>, payload: Bytes, -) -> std::result::Result { +) -> Result { let item_quantities = item.quantities(); let err = |e: Error| (e, item_quantities.clone()); @@ -115,7 +115,7 @@ fn expand_json_item( filter_settings: &relay_filter::ProjectFiltersConfig, ctx: Context<'_>, payload: Bytes, -) -> std::result::Result { +) -> Result { let item_quantities = item.quantities(); let err = |e: Error| (e, item_quantities.clone()); @@ -239,13 +239,6 @@ mod tests { .build() } - fn expand_single( - managed: Managed, - ctx: Context<'_>, - ) -> Managed { - expand(managed, ctx) - } - #[test] fn test_expand_compound_unknown_content_type() { let meta = perfetto_meta(); @@ -258,7 +251,7 @@ mod tests { item.set_meta_length(meta_length); let (managed, _handle) = make_chunks(vec![item]); - let expanded = expand_single(managed, Context::for_test()); + let expanded = expand(managed, Context::for_test()); let chunks = expanded.accept(|c| c); assert!(chunks.chunks.is_empty(), "item should be dropped"); } @@ -269,7 +262,7 @@ mod tests { let item = make_compound_item(&meta, PERFETTO_FIXTURE, "android"); let (managed, _handle) = make_chunks(vec![item]); - let expanded = expand_single(managed, Context::for_test()); + let expanded = expand(managed, Context::for_test()); let chunks = expanded.accept(|c| c); assert!( chunks.chunks.is_empty(), @@ -287,14 +280,35 @@ mod tests { ); item.set_meta_length(body.len() as u32 + 100); item.set_platform("android".to_owned()); - let (managed, _handle) = make_chunks(vec![item]); + let (managed, mut handle) = make_chunks(vec![item]); - let expanded = expand_single(managed, Context::for_test()); + let ctx = Context { + project_info: &ProjectInfo { + config: ProjectConfig { + features: FeatureSet::from_iter([ + Feature::ContinuousProfiling, + Feature::ContinuousProfilingPerfetto, + ]), + ..Default::default() + }, + ..Default::default() + }, + ..Context::for_test() + }; + + let expanded = expand(managed, ctx); let chunks = expanded.accept(|c| c); assert!( chunks.chunks.is_empty(), "item should be dropped on out-of-bounds meta_length" ); + handle.assert_outcome( + &Outcome::Invalid(DiscardReason::Profiling( + "profiling_invalid_sampled_profile", + )), + DataCategory::ProfileChunkUi, + 1, + ); } #[test] @@ -323,7 +337,7 @@ mod tests { ..Context::for_test() }; - let expanded = expand_single(managed, ctx); + let expanded = expand(managed, ctx); let chunks = expanded.accept(|c| c); assert!( chunks.chunks.is_empty(), @@ -351,7 +365,7 @@ mod tests { ..Context::for_test() }; - let expanded = expand_single(managed, ctx); + let expanded = expand(managed, ctx); let chunks = expanded.accept(|c| c); assert_eq!(chunks.chunks.len(), 1, "item should be retained"); @@ -376,7 +390,7 @@ mod tests { let item = make_json_item(JSON_FIXTURE); let (managed, _handle) = make_chunks(vec![item]); - let expanded = expand_single(managed, Context::for_test()); + let expanded = expand(managed, Context::for_test()); let chunks = expanded.accept(|c| c); assert_eq!(chunks.chunks.len(), 1, "item should be retained"); @@ -397,7 +411,7 @@ mod tests { item.set_meta_length(10); let (managed, _handle) = make_chunks(vec![item]); - let expanded = expand_single(managed, Context::for_test()); + let expanded = expand(managed, Context::for_test()); let chunks = expanded.accept(|c| c); assert!( chunks.chunks.is_empty(), @@ -413,7 +427,7 @@ mod tests { item.set_platform("node".to_owned()); let (managed, mut handle) = make_chunks(vec![item]); - let expanded = expand_single(managed, Context::for_test()); + let expanded = expand(managed, Context::for_test()); let chunks = expanded.accept(|c| c); assert!( chunks.chunks.is_empty(),