Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions logfire-api/logfire_api/sampling/_tail_sampling.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,19 @@ class SamplingOptions:
def check_trace_id_ratio(trace_id: int, rate: float) -> bool: ...

class TailSamplingProcessor(WrapperSpanProcessor):
"""Passes spans to the wrapped processor if any span in a trace meets the sampling criteria."""
"""Buffers spans until a span in the trace meets the sampling criteria.

Two types of wrapped processors are supported:
- `processor`: Both `on_start` and `on_end` are buffered until sampling decision.
Use for processors like `PendingSpanProcessor` that create spans in `on_start`.
- `immediate_on_start_processor`: `on_start` is called immediately, only `on_end` is buffered.
Use for processors that just set attributes (like `DirectBaggageAttributesSpanProcessor`).
"""
get_tail_sample_rate: Incomplete
immediate_on_start_processor: SpanProcessor | None
traces: dict[int, TraceBuffer]
lock: Incomplete
def __init__(self, processor: SpanProcessor, get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float]) -> None: ...
def __init__(self, processor: SpanProcessor, get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float], immediate_on_start_processor: SpanProcessor | None = None) -> None: ...
def on_start(self, span: Span, parent_context: context.Context | None = None) -> None: ...
def on_end(self, span: ReadableSpan) -> None: ...
def check_span(self, span_info: TailSamplingSpanInfo) -> bool:
Expand Down
18 changes: 16 additions & 2 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,20 @@ def _initialize(self) -> None:

processors_with_pending_spans: list[SpanProcessor] = []
root_processor = main_multiprocessor = SynchronousMultiSpanProcessor()

# Processors that only set attributes can have on_start called immediately,
# even during tail sampling. This avoids "Setting attribute on ended span" warnings.
immediate_on_start_multiprocessor: SynchronousMultiSpanProcessor | None = None
if self.sampling.tail and self.add_baggage_to_attributes:
immediate_on_start_multiprocessor = SynchronousMultiSpanProcessor()
immediate_on_start_multiprocessor.add_span_processor(DirectBaggageAttributesSpanProcessor())

if self.sampling.tail:
root_processor = TailSamplingProcessor(root_processor, self.sampling.tail)
root_processor = TailSamplingProcessor(
root_processor,
self.sampling.tail,
immediate_on_start_processor=immediate_on_start_multiprocessor,
)
tracer_provider.add_span_processor(
CheckSuppressInstrumentationProcessorWrapper(
MainSpanProcessorWrapper(root_processor, self.scrubber),
Expand All @@ -908,7 +920,9 @@ def add_span_processor(span_processor: SpanProcessor) -> None:
if has_pending:
processors_with_pending_spans.append(span_processor)

if self.add_baggage_to_attributes:
# Only add baggage processor to main_multiprocessor if NOT using tail sampling
# (when using tail sampling, it's added to immediate_on_start_multiprocessor above)
if self.add_baggage_to_attributes and not self.sampling.tail:
add_span_processor(DirectBaggageAttributesSpanProcessor())

if self.additional_span_processors is not None:
Expand Down
47 changes: 37 additions & 10 deletions logfire/sampling/_tail_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,29 @@ def check_trace_id_ratio(trace_id: int, rate: float) -> bool:


class TailSamplingProcessor(WrapperSpanProcessor):
"""Passes spans to the wrapped processor if any span in a trace meets the sampling criteria."""
"""Buffers spans until a span in the trace meets the sampling criteria.

def __init__(self, processor: SpanProcessor, get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float]) -> None:
Two types of wrapped processors are supported:
- `processor`: Both `on_start` and `on_end` are buffered until sampling decision.
Use for processors like `PendingSpanProcessor` that create spans in `on_start`.
- `immediate_on_start_processor`: `on_start` is called immediately, only `on_end` is buffered.
Use for processors that just set attributes (like `DirectBaggageAttributesSpanProcessor`).
"""

def __init__(
self,
processor: SpanProcessor,
get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float],
immediate_on_start_processor: SpanProcessor | None = None,
) -> None:
super().__init__(processor)
self.get_tail_sample_rate = get_tail_sample_rate
self.immediate_on_start_processor = immediate_on_start_processor

# A TraceBuffer is typically created for each new trace.
# If a span meets the sampling criteria, the buffer is dropped and all spans within are pushed
# to the wrapped processor.
# So when more spans arrive and there's no buffer, they get passed through immediately.
# A TraceBuffer is created for each new trace to buffer on_start/on_end calls.
# If a span meets the sampling criteria, the buffer is dropped and buffered calls
# are passed to the wrapped processor.
# When more spans arrive after sampling, they get passed through immediately (no buffer).
self.traces: dict[int, TraceBuffer] = {}

# Code that touches self.traces and its contents should be protected by this lock.
Expand All @@ -178,11 +191,15 @@ def on_start(self, span: Span, parent_context: context.Context | None = None) ->
# This trace's spans haven't met the criteria yet, so add this span to the buffer.
buffer.started.append((span, parent_context))
dropped = self.check_span(TailSamplingSpanInfo(span, parent_context, 'start', buffer))
# The opposite case is handled outside the lock since it may take some time.

# This code may take longer since it calls the wrapped processor which might do anything.
# It shouldn't be inside the lock to avoid blocking other threads.
# Since it's not in the lock, it shouldn't touch self.traces or its contents.
# Call immediate_on_start_processor.on_start() right away.
# These processors just set attributes, which is safe even if the trace isn't sampled.
if self.immediate_on_start_processor is not None:
self.immediate_on_start_processor.on_start(span, parent_context)

# For the main processor (which may contain PendingSpanProcessor), buffer on_start
# until we know the trace is sampled.
# This code is outside the lock since it may take some time.
if buffer is None:
super().on_start(span, parent_context)
elif dropped:
Expand All @@ -206,7 +223,11 @@ def on_end(self, span: ReadableSpan) -> None:
# Delete the buffer to save memory.
self.traces.pop(trace_id, None)

# For immediate_on_start_processor, on_end is also buffered (only on_start is immediate).
# This ensures spans only get exported if the trace is sampled.
if buffer is None:
if self.immediate_on_start_processor is not None:
self.immediate_on_start_processor.on_end(span)
super().on_end(span)
elif dropped:
self.push_buffer(buffer)
Expand All @@ -223,6 +244,12 @@ def drop_buffer(self, buffer: TraceBuffer) -> None:
del self.traces[buffer.trace_id]

def push_buffer(self, buffer: TraceBuffer) -> None:
# For immediate_on_start_processor, on_start was already called, so only call on_end.
if self.immediate_on_start_processor is not None:
for span in buffer.ended:
self.immediate_on_start_processor.on_end(span)

# For the main processor, replay both on_start and on_end.
for started in buffer.started:
super().on_start(*started)
for span in buffer.ended:
Expand Down
49 changes: 49 additions & 0 deletions tests/test_tail_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,52 @@ def test_both_trace_and_head():
)
):
logfire.configure(trace_sample_rate=0.5, sampling=logfire.SamplingOptions()) # type: ignore


def test_tail_sampling_no_warning_on_ended_span(
exporter: TestExporter,
id_generator: Any,
time_generator: TimeGenerator,
caplog: pytest.LogCaptureFixture,
):
"""Test that tail sampling with baggage doesn't trigger 'Setting attribute on ended span' warnings.

DirectBaggageAttributesSpanProcessor is passed as immediate_on_start_processor,
so its on_start is called immediately (before spans end), avoiding the warning.
"""
import logging

from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

from logfire._internal.exporters.test import TestLogExporter

config_kwargs = dict(
send_to_logfire=False,
console=False,
advanced=logfire.AdvancedOptions(
id_generator=id_generator,
ns_timestamp_generator=time_generator,
log_record_processors=[SimpleLogRecordProcessor(TestLogExporter(time_generator))],
),
additional_span_processors=[SimpleSpanProcessor(exporter)],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test adding another span processor which sets an attribute in on_start, and test that the attribute is actually set

add_baggage_to_attributes=True, # Enable baggage processor
)

logfire.configure(
**config_kwargs, # type: ignore
sampling=logfire.SamplingOptions(tail=lambda info: 1.0 if info.level >= 'error' else 0.0),
)

caplog.set_level(logging.WARNING, logger='opentelemetry.sdk.trace')

with logfire.span('root'):
with logfire.span('child_1'):
pass
with logfire.span('child_2'):
pass
logfire.error('Trigger sampling')

warnings = [r for r in caplog.records if 'Setting attribute on ended span' in r.message]
assert warnings == [], f'Unexpected warnings: {[r.message for r in warnings]}'
assert len(exporter.exported_spans_as_dict()) > 0
Loading