diff --git a/logfire-api/logfire_api/sampling/_tail_sampling.pyi b/logfire-api/logfire_api/sampling/_tail_sampling.pyi index 5a018d3cc..e674a73ca 100644 --- a/logfire-api/logfire_api/sampling/_tail_sampling.pyi +++ b/logfire-api/logfire_api/sampling/_tail_sampling.pyi @@ -60,11 +60,19 @@ class SamplingOptions: def check_trace_id_ratio(trace_id: int, rate: float) -> bool: ... class TailSamplingProcessor(WrapperSpanProcessor): - """Passes spans to the wrapped processor if any span in a trace meets the sampling criteria.""" + """Buffers spans until a span in the trace meets the sampling criteria. + + Two types of wrapped processors are supported: + - `processor`: Both `on_start` and `on_end` are buffered until sampling decision. + Use for processors like `PendingSpanProcessor` that create spans in `on_start`. + - `immediate_on_start_processor`: `on_start` is called immediately, only `on_end` is buffered. + Use for processors that just set attributes (like `DirectBaggageAttributesSpanProcessor`). + """ get_tail_sample_rate: Incomplete + immediate_on_start_processor: SpanProcessor | None traces: dict[int, TraceBuffer] lock: Incomplete - def __init__(self, processor: SpanProcessor, get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float]) -> None: ... + def __init__(self, processor: SpanProcessor, get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float], immediate_on_start_processor: SpanProcessor | None = None) -> None: ... def on_start(self, span: Span, parent_context: context.Context | None = None) -> None: ... def on_end(self, span: ReadableSpan) -> None: ... def check_span(self, span_info: TailSamplingSpanInfo) -> bool: diff --git a/logfire/_internal/config.py b/logfire/_internal/config.py index 4638786c2..134d74b08 100644 --- a/logfire/_internal/config.py +++ b/logfire/_internal/config.py @@ -891,8 +891,20 @@ def _initialize(self) -> None: processors_with_pending_spans: list[SpanProcessor] = [] root_processor = main_multiprocessor = SynchronousMultiSpanProcessor() + + # Processors that only set attributes can have on_start called immediately, + # even during tail sampling. This avoids "Setting attribute on ended span" warnings. + immediate_on_start_multiprocessor: SynchronousMultiSpanProcessor | None = None + if self.sampling.tail and self.add_baggage_to_attributes: + immediate_on_start_multiprocessor = SynchronousMultiSpanProcessor() + immediate_on_start_multiprocessor.add_span_processor(DirectBaggageAttributesSpanProcessor()) + if self.sampling.tail: - root_processor = TailSamplingProcessor(root_processor, self.sampling.tail) + root_processor = TailSamplingProcessor( + root_processor, + self.sampling.tail, + immediate_on_start_processor=immediate_on_start_multiprocessor, + ) tracer_provider.add_span_processor( CheckSuppressInstrumentationProcessorWrapper( MainSpanProcessorWrapper(root_processor, self.scrubber), @@ -908,7 +920,9 @@ def add_span_processor(span_processor: SpanProcessor) -> None: if has_pending: processors_with_pending_spans.append(span_processor) - if self.add_baggage_to_attributes: + # Only add baggage processor to main_multiprocessor if NOT using tail sampling + # (when using tail sampling, it's added to immediate_on_start_multiprocessor above) + if self.add_baggage_to_attributes and not self.sampling.tail: add_span_processor(DirectBaggageAttributesSpanProcessor()) if self.additional_span_processors is not None: diff --git a/logfire/sampling/_tail_sampling.py b/logfire/sampling/_tail_sampling.py index 4f12abccf..ce291e49b 100644 --- a/logfire/sampling/_tail_sampling.py +++ b/logfire/sampling/_tail_sampling.py @@ -146,16 +146,29 @@ def check_trace_id_ratio(trace_id: int, rate: float) -> bool: class TailSamplingProcessor(WrapperSpanProcessor): - """Passes spans to the wrapped processor if any span in a trace meets the sampling criteria.""" + """Buffers spans until a span in the trace meets the sampling criteria. - def __init__(self, processor: SpanProcessor, get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float]) -> None: + Two types of wrapped processors are supported: + - `processor`: Both `on_start` and `on_end` are buffered until sampling decision. + Use for processors like `PendingSpanProcessor` that create spans in `on_start`. + - `immediate_on_start_processor`: `on_start` is called immediately, only `on_end` is buffered. + Use for processors that just set attributes (like `DirectBaggageAttributesSpanProcessor`). + """ + + def __init__( + self, + processor: SpanProcessor, + get_tail_sample_rate: Callable[[TailSamplingSpanInfo], float], + immediate_on_start_processor: SpanProcessor | None = None, + ) -> None: super().__init__(processor) self.get_tail_sample_rate = get_tail_sample_rate + self.immediate_on_start_processor = immediate_on_start_processor - # A TraceBuffer is typically created for each new trace. - # If a span meets the sampling criteria, the buffer is dropped and all spans within are pushed - # to the wrapped processor. - # So when more spans arrive and there's no buffer, they get passed through immediately. + # A TraceBuffer is created for each new trace to buffer on_start/on_end calls. + # If a span meets the sampling criteria, the buffer is dropped and buffered calls + # are passed to the wrapped processor. + # When more spans arrive after sampling, they get passed through immediately (no buffer). self.traces: dict[int, TraceBuffer] = {} # Code that touches self.traces and its contents should be protected by this lock. @@ -178,11 +191,15 @@ def on_start(self, span: Span, parent_context: context.Context | None = None) -> # This trace's spans haven't met the criteria yet, so add this span to the buffer. buffer.started.append((span, parent_context)) dropped = self.check_span(TailSamplingSpanInfo(span, parent_context, 'start', buffer)) - # The opposite case is handled outside the lock since it may take some time. - # This code may take longer since it calls the wrapped processor which might do anything. - # It shouldn't be inside the lock to avoid blocking other threads. - # Since it's not in the lock, it shouldn't touch self.traces or its contents. + # Call immediate_on_start_processor.on_start() right away. + # These processors just set attributes, which is safe even if the trace isn't sampled. + if self.immediate_on_start_processor is not None: + self.immediate_on_start_processor.on_start(span, parent_context) + + # For the main processor (which may contain PendingSpanProcessor), buffer on_start + # until we know the trace is sampled. + # This code is outside the lock since it may take some time. if buffer is None: super().on_start(span, parent_context) elif dropped: @@ -206,7 +223,11 @@ def on_end(self, span: ReadableSpan) -> None: # Delete the buffer to save memory. self.traces.pop(trace_id, None) + # For immediate_on_start_processor, on_end is also buffered (only on_start is immediate). + # This ensures spans only get exported if the trace is sampled. if buffer is None: + if self.immediate_on_start_processor is not None: + self.immediate_on_start_processor.on_end(span) super().on_end(span) elif dropped: self.push_buffer(buffer) @@ -223,6 +244,12 @@ def drop_buffer(self, buffer: TraceBuffer) -> None: del self.traces[buffer.trace_id] def push_buffer(self, buffer: TraceBuffer) -> None: + # For immediate_on_start_processor, on_start was already called, so only call on_end. + if self.immediate_on_start_processor is not None: + for span in buffer.ended: + self.immediate_on_start_processor.on_end(span) + + # For the main processor, replay both on_start and on_end. for started in buffer.started: super().on_start(*started) for span in buffer.ended: diff --git a/tests/test_tail_sampling.py b/tests/test_tail_sampling.py index aa6a7ff53..8a6f018d1 100644 --- a/tests/test_tail_sampling.py +++ b/tests/test_tail_sampling.py @@ -514,3 +514,52 @@ def test_both_trace_and_head(): ) ): logfire.configure(trace_sample_rate=0.5, sampling=logfire.SamplingOptions()) # type: ignore + + +def test_tail_sampling_no_warning_on_ended_span( + exporter: TestExporter, + id_generator: Any, + time_generator: TimeGenerator, + caplog: pytest.LogCaptureFixture, +): + """Test that tail sampling with baggage doesn't trigger 'Setting attribute on ended span' warnings. + + DirectBaggageAttributesSpanProcessor is passed as immediate_on_start_processor, + so its on_start is called immediately (before spans end), avoiding the warning. + """ + import logging + + from opentelemetry.sdk._logs.export import SimpleLogRecordProcessor + from opentelemetry.sdk.trace.export import SimpleSpanProcessor + + from logfire._internal.exporters.test import TestLogExporter + + config_kwargs = dict( + send_to_logfire=False, + console=False, + advanced=logfire.AdvancedOptions( + id_generator=id_generator, + ns_timestamp_generator=time_generator, + log_record_processors=[SimpleLogRecordProcessor(TestLogExporter(time_generator))], + ), + additional_span_processors=[SimpleSpanProcessor(exporter)], + add_baggage_to_attributes=True, # Enable baggage processor + ) + + logfire.configure( + **config_kwargs, # type: ignore + sampling=logfire.SamplingOptions(tail=lambda info: 1.0 if info.level >= 'error' else 0.0), + ) + + caplog.set_level(logging.WARNING, logger='opentelemetry.sdk.trace') + + with logfire.span('root'): + with logfire.span('child_1'): + pass + with logfire.span('child_2'): + pass + logfire.error('Trigger sampling') + + warnings = [r for r in caplog.records if 'Setting attribute on ended span' in r.message] + assert warnings == [], f'Unexpected warnings: {[r.message for r in warnings]}' + assert len(exporter.exported_spans_as_dict()) > 0