Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate messaging.kafka.message.offset in all kafka instrumentations #7374

Merged
merged 2 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public final class KafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";
Expand All @@ -31,7 +32,7 @@ public final class KafkaSingletons {
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true);

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER;
private static final Instrumenter<ProducerRecord<?, ?>, RecordMetadata> PRODUCER_INSTRUMENTER;
private static final Instrumenter<ConsumerRecords<?, ?>, Void> CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER;

Expand All @@ -53,7 +54,7 @@ public static boolean isProducerPropagationEnabled() {
return PRODUCER_PROPAGATION_ENABLED;
}

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
public static Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ProducerCallback(

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
producerInstrumenter().end(context, request, null, exception);
producerInstrumenter().end(context, request, metadata, exception);

if (callback != null) {
try (Scope ignored = parentContext.makeCurrent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
Expand Down Expand Up @@ -108,7 +110,7 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
Expand Down Expand Up @@ -156,6 +158,8 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}

Expand Down Expand Up @@ -184,9 +188,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"kafka.offset" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
Expand Down Expand Up @@ -232,7 +236,8 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
}
}

Expand Down Expand Up @@ -261,8 +266,8 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
}
Expand All @@ -57,6 +59,8 @@ class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
}
Expand All @@ -72,7 +76,7 @@ class KafkaClientPropagationDisabledTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
span(1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
span(2) {
Expand All @@ -69,7 +71,7 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
Expand Down Expand Up @@ -110,6 +112,8 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
span(1) {
Expand All @@ -122,9 +126,9 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE" true
"kafka.offset" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
Expand Down Expand Up @@ -163,7 +167,8 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
}
}
span(1) {
Expand All @@ -176,8 +181,8 @@ class KafkaClientSuppressReceiveSpansTest extends KafkaClientPropagationBaseTest
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" partition
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ public final class KafkaTelemetry {
private static final TextMapSetter<Headers> SETTER = KafkaHeadersSetter.INSTANCE;

private final OpenTelemetry openTelemetry;
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
private final Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter;
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;
private final boolean producerPropagationEnabled;

KafkaTelemetry(
OpenTelemetry openTelemetry,
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
Instrumenter<ProducerRecord<?, ?>, RecordMetadata> producerInstrumenter,
Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter,
boolean producerPropagationEnabled) {
this.openTelemetry = openTelemetry;
Expand Down Expand Up @@ -238,7 +238,7 @@ public ProducerCallback(

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
producerInstrumenter.end(context, request, null, exception);
producerInstrumenter.end(context, request, metadata, exception);

if (callback != null) {
try (Scope ignored = parentContext.makeCurrent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public final class KafkaTelemetryBuilder {
static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-2.6";

private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerAttributesExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ProducerRecord<?, ?>, RecordMetadata>>
producerAttributesExtractors = new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>> consumerAttributesExtractors =
new ArrayList<>();
private List<String> capturedHeaders = emptyList();
Expand All @@ -36,7 +37,7 @@ public final class KafkaTelemetryBuilder {

@CanIgnoreReturnValue
public KafkaTelemetryBuilder addProducerAttributesExtractors(
AttributesExtractor<ProducerRecord<?, ?>, Void> extractor) {
AttributesExtractor<ProducerRecord<?, ?>, RecordMetadata> extractor) {
producerAttributesExtractors.add(extractor);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION" SHARED_TOPIC
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"messaging.kafka.message.offset" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
Expand All @@ -88,7 +90,7 @@ class WrappersTest extends KafkaClientBaseTest implements LibraryTestTrait {
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_KAFKA_PARTITION" { it >= 0 }
"kafka.offset" Long
"messaging.kafka.message.offset" { it >= 0 }
"kafka.record.queue_time_ms" { it >= 0 }
if (testHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package io.opentelemetry.instrumentation.kafka.internal;

import static io.opentelemetry.api.common.AttributeKey.longKey;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
Expand All @@ -18,10 +21,16 @@
*/
public final class KafkaConsumerAdditionalAttributesExtractor
implements AttributesExtractor<ConsumerRecord<?, ?>, Void> {

// TODO: remove this constant when this attribute appears in SemanticAttributes
private static final AttributeKey<Long> MESSAGING_KAFKA_MESSAGE_OFFSET =
longKey("messaging.kafka.message.offset");

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, ConsumerRecord<?, ?> consumerRecord) {
attributes.put(SemanticAttributes.MESSAGING_KAFKA_PARTITION, (long) consumerRecord.partition());
attributes.put(MESSAGING_KAFKA_MESSAGE_OFFSET, consumerRecord.offset());
if (consumerRecord.value() == null) {
attributes.put(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
public final class KafkaConsumerExperimentalAttributesExtractor
implements AttributesExtractor<ConsumerRecord<?, ?>, Void> {

private static final AttributeKey<Long> KAFKA_OFFSET = longKey("kafka.offset");
private static final AttributeKey<Long> KAFKA_RECORD_QUEUE_TIME_MS =
longKey("kafka.record.queue_time_ms");

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, ConsumerRecord<?, ?> consumerRecord) {
attributes.put(KAFKA_OFFSET, consumerRecord.offset());

// don't record a duration if the message was sent from an old Kafka client
if (consumerRecord.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
Expand Down Expand Up @@ -80,17 +81,17 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled(
return this;
}

public Instrumenter<ProducerRecord<?, ?>, Void> createProducerInstrumenter() {
public Instrumenter<ProducerRecord<?, ?>, RecordMetadata> createProducerInstrumenter() {
return createProducerInstrumenter(Collections.emptyList());
}

public Instrumenter<ProducerRecord<?, ?>, Void> createProducerInstrumenter(
Iterable<AttributesExtractor<ProducerRecord<?, ?>, Void>> extractors) {
public Instrumenter<ProducerRecord<?, ?>, RecordMetadata> createProducerInstrumenter(
Iterable<AttributesExtractor<ProducerRecord<?, ?>, RecordMetadata>> extractors) {

KafkaProducerAttributesGetter getter = KafkaProducerAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.SEND;

return Instrumenter.<ProducerRecord<?, ?>, Void>builder(
return Instrumenter.<ProducerRecord<?, ?>, RecordMetadata>builder(
openTelemetry,
instrumentationName,
MessagingSpanNameExtractor.create(getter, operation))
Expand Down Expand Up @@ -169,10 +170,11 @@ public KafkaInstrumenterFactory setMessagingReceiveInstrumentationEnabled(
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

private static <T> MessagingAttributesExtractor<T, Void> buildMessagingAttributesExtractor(
MessagingAttributesGetter<T, Void> getter,
MessageOperation operation,
List<String> capturedHeaders) {
private static <REQUEST, RESPONSE>
MessagingAttributesExtractor<REQUEST, RESPONSE> buildMessagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter,
MessageOperation operation,
List<String> capturedHeaders) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,31 @@

package io.opentelemetry.instrumentation.kafka.internal;

import static io.opentelemetry.api.common.AttributeKey.longKey;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class KafkaProducerAdditionalAttributesExtractor
implements AttributesExtractor<ProducerRecord<?, ?>, Void> {
final class KafkaProducerAdditionalAttributesExtractor
implements AttributesExtractor<ProducerRecord<?, ?>, RecordMetadata> {

// TODO: remove this constant when this attribute appears in SemanticAttributes
private static final AttributeKey<Long> MESSAGING_KAFKA_MESSAGE_OFFSET =
longKey("messaging.kafka.message.offset");

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, ProducerRecord<?, ?> producerRecord) {
Integer partition = producerRecord.partition();
if (partition != null) {
attributes.put(SemanticAttributes.MESSAGING_KAFKA_PARTITION, partition.longValue());
}
if (producerRecord.value() == null) {
attributes.put(SemanticAttributes.MESSAGING_KAFKA_TOMBSTONE, true);
}
Expand All @@ -35,6 +40,12 @@ public void onEnd(
AttributesBuilder attributes,
Context context,
ProducerRecord<?, ?> producerRecord,
@Nullable Void unused,
@Nullable Throwable error) {}
@Nullable RecordMetadata recordMetadata,
@Nullable Throwable error) {

if (recordMetadata != null) {
attributes.put(SemanticAttributes.MESSAGING_KAFKA_PARTITION, recordMetadata.partition());
attributes.put(MESSAGING_KAFKA_MESSAGE_OFFSET, recordMetadata.offset());
}
}
}
Loading