Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

### Fixes

- Inject Kafka trace headers even without an active span so distributed tracing works for background workers and `@Scheduled` jobs ([#5338](https://github.com/getsentry/sentry-java/pull/5338))
- Write the `sentry-task-enqueued-time` Kafka header as a plain decimal so cross-SDK consumers (e.g. sentry-python) can parse it ([#5328](https://github.com/getsentry/sentry-java/pull/5328))

## 8.37.1
Expand Down
26 changes: 19 additions & 7 deletions sentry-kafka/api/sentry-kafka.api
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing {
public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object;
}

public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor {
public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer {
public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String;
public static final field TRACE_ORIGIN Ljava/lang/String;
public fun <init> ()V
public fun <init> (Lio/sentry/IScopes;)V
public fun <init> (Lio/sentry/IScopes;Ljava/lang/String;)V
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;)V
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V
public fun <init> (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V
public fun abortTransaction ()V
public fun beginTransaction ()V
public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid;
public fun close ()V
public fun configure (Ljava/util/Map;)V
public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V
public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord;
public fun close (Ljava/time/Duration;)V
public fun commitTransaction ()V
public fun flush ()V
public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer;
public fun initTransactions ()V
public fun metrics ()Ljava/util/Map;
public fun partitionsFor (Ljava/lang/String;)Ljava/util/List;
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future;
public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future;
public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V
public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V
public fun toString ()Ljava/lang/String;
}

Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void finishTransaction(

private <K, V> @Nullable Long receiveLatency(final @NotNull ConsumerRecord<K, V> record) {
final @Nullable String enqueuedTimeStr =
headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER);
if (enqueuedTimeStr == null) {
return null;
}
Expand Down
281 changes: 281 additions & 0 deletions sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
package io.sentry.kafka;

import io.sentry.BaggageHeader;
import io.sentry.DateUtils;
import io.sentry.IScopes;
import io.sentry.ISpan;
import io.sentry.ScopesAdapter;
import io.sentry.SentryLevel;
import io.sentry.SentryTraceHeader;
import io.sentry.SpanDataConvention;
import io.sentry.SpanOptions;
import io.sentry.SpanStatus;
import io.sentry.util.SpanUtils;
import io.sentry.util.TracingUtils;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send}
* and to inject Sentry trace propagation headers into the produced record.
*
* <p>Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the
* span open until the send callback fires, so the span reflects the actual broker-ack lifecycle.
*
* <p>For raw Kafka usage:
*
* <pre>{@code
* Producer<String, String> producer =
* new SentryKafkaProducer<>(new KafkaProducer<>(props));
* }</pre>
*
* <p>For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code
* sentry-spring-jakarta} installs this wrapper automatically via {@code
* ProducerFactory.addPostProcessor(...)}.
*/
@ApiStatus.Experimental
public final class SentryKafkaProducer<K, V> implements Producer<K, V> {

public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer";
public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time";

private final @NotNull Producer<K, V> delegate;
private final @NotNull IScopes scopes;
private final @NotNull String traceOrigin;

public SentryKafkaProducer(final @NotNull Producer<K, V> delegate) {
this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN);
}

public SentryKafkaProducer(
final @NotNull Producer<K, V> delegate, final @NotNull IScopes scopes) {
this(delegate, scopes, TRACE_ORIGIN);
}

public SentryKafkaProducer(
final @NotNull Producer<K, V> delegate,
final @NotNull IScopes scopes,
final @NotNull String traceOrigin) {
this.delegate = delegate;
this.scopes = scopes;
this.traceOrigin = traceOrigin;
}

/** Returns the wrapped producer. */
public @NotNull Producer<K, V> getDelegate() {
return delegate;
}

@Override
public @NotNull Future<RecordMetadata> send(final @NotNull ProducerRecord<K, V> record) {
return send(record, null);
}

@Override
public @NotNull Future<RecordMetadata> send(
final @NotNull ProducerRecord<K, V> record, final @Nullable Callback callback) {
if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) {
return delegate.send(record, callback);
}

final @Nullable ISpan activeSpan = scopes.getSpan();
if (activeSpan == null || activeSpan.isNoOp()) {
maybeInjectHeaders(record.headers(), null);
return delegate.send(record, callback);
}

final @NotNull SpanOptions spanOptions = new SpanOptions();
spanOptions.setOrigin(traceOrigin);
final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions);

span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka");
span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic());
maybeInjectHeaders(record.headers(), span);

try {
return delegate.send(record, wrapCallback(callback, span));
} catch (Throwable t) {
finishWithError(span, t);
throw t;
}
}

private @NotNull Callback wrapCallback(
final @Nullable Callback userCallback, final @NotNull ISpan span) {
return (metadata, exception) -> {
try {
if (exception != null) {
span.setThrowable(exception);
span.setStatus(SpanStatus.INTERNAL_ERROR);
} else {
span.setStatus(SpanStatus.OK);
}
} catch (Throwable t) {
scopes
.getOptions()
.getLogger()
.log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t);
} finally {
span.finish();
if (userCallback != null) {
userCallback.onCompletion(metadata, exception);
}
}
};
}

private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) {
span.setThrowable(t);
span.setStatus(SpanStatus.INTERNAL_ERROR);
span.finish();
}

private boolean isIgnored() {
return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin);
}

private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) {
try {
final @Nullable List<String> existingBaggageHeaders =
readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER);
final @Nullable TracingUtils.TracingHeaders tracingHeaders =
TracingUtils.trace(scopes, existingBaggageHeaders, span);
if (tracingHeaders != null) {
final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader();
headers.remove(sentryTraceHeader.getName());
headers.add(
sentryTraceHeader.getName(),
sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8));

final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader();
if (baggageHeader != null) {
headers.remove(baggageHeader.getName());
headers.add(
baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8));
}
}

headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
headers.add(
SENTRY_ENQUEUED_TIME_HEADER,
DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis()))
.toString()
.getBytes(StandardCharsets.UTF_8));
} catch (Throwable t) {
scopes
.getOptions()
.getLogger()
.log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t);
}
}

private static @Nullable List<String> readHeaderValues(
final @NotNull Headers headers, final @NotNull String name) {
@Nullable List<String> values = null;
for (final @NotNull Header header : headers.headers(name)) {
final byte @Nullable [] value = header.value();
if (value != null) {
if (values == null) {
values = new ArrayList<>();
}
values.add(new String(value, StandardCharsets.UTF_8));
}
}
return values;
}

// --- Pure delegation for everything else ---

@Override
public void initTransactions() {
delegate.initTransactions();
}

@Override
public void beginTransaction() throws ProducerFencedException {
delegate.beginTransaction();
}

@Override
@SuppressWarnings("deprecation")
public void sendOffsetsToTransaction(
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
final @NotNull String consumerGroupId)
throws ProducerFencedException {
delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
}

@Override
public void sendOffsetsToTransaction(
final @NotNull Map<TopicPartition, OffsetAndMetadata> offsets,
final @NotNull ConsumerGroupMetadata groupMetadata)
throws ProducerFencedException {
delegate.sendOffsetsToTransaction(offsets, groupMetadata);
}

@Override
public void commitTransaction() throws ProducerFencedException {
delegate.commitTransaction();
}

@Override
public void abortTransaction() throws ProducerFencedException {
delegate.abortTransaction();
}

@Override
public void flush() {
delegate.flush();
}

@Override
public @NotNull List<PartitionInfo> partitionsFor(final @NotNull String topic) {
return delegate.partitionsFor(topic);
}

@Override
public @NotNull Map<MetricName, ? extends Metric> metrics() {
return delegate.metrics();
}

@Override
public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) {
return delegate.clientInstanceId(timeout);
}

@Override
public void close() {
delegate.close();
}

@Override
public void close(final @NotNull Duration timeout) {
delegate.close(timeout);
}

@Override
public @NotNull String toString() {
return "SentryKafkaProducer[delegate=" + delegate + "]";
}
}
Loading
Loading