diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index 5ca649875dc7..7fbbfa4734bc 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -28,6 +28,8 @@ tasks { withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) + systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + // TODO run tests both with and without experimental span attributes jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index 3e2e8898181a..d886066353e2 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -7,6 +7,8 @@ import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.enhanceConfig; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.returns; @@ -21,6 +23,8 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import java.time.Duration; +import java.util.Map; +import java.util.Properties; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -36,6 +40,12 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, Map.class)), + this.getClass().getName() + "$ConstructorMapAdvice"); + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, Properties.class)), + this.getClass().getName() + "$ConstructorPropertiesAdvice"); transformer.applyAdviceToMethod( named("poll") .and(isPublic()) @@ -45,6 +55,24 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$PollAdvice"); } + @SuppressWarnings("unused") + public static class ConstructorMapAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(0) Map config) { + enhanceConfig(config); + } + } + + @SuppressWarnings("unused") + public static class ConstructorPropertiesAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(0) Properties config) { + enhanceConfig(config); + } + } + @SuppressWarnings("unused") public static class PollAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java index 695496d6b5ad..98f5965d5239 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java @@ -5,7 +5,9 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.enhanceConfig; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -17,6 +19,8 @@ import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.Map; +import java.util.Properties; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -33,17 +37,41 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, Map.class)), + this.getClass().getName() + "$ConstructorMapAdvice"); + transformer.applyAdviceToMethod( + isConstructor().and(takesArgument(0, Properties.class)), + this.getClass().getName() + "$ConstructorPropertiesAdvice"); transformer.applyAdviceToMethod( isMethod() .and(isPublic()) .and(named("send")) .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), - KafkaProducerInstrumentation.class.getName() + "$ProducerAdvice"); + KafkaProducerInstrumentation.class.getName() + "$SendAdvice"); + } + + @SuppressWarnings("unused") + public static class ConstructorMapAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(0) Map config) { + enhanceConfig(config); + } + } + + @SuppressWarnings("unused") + public static class ConstructorPropertiesAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(0) Properties config) { + enhanceConfig(config); + } } @SuppressWarnings("unused") - public static class ProducerAdvice { + public static class SendAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 5c8c6e64f578..fc7cfe72aeb1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -8,8 +8,11 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; +import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; +import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; @@ -20,6 +23,9 @@ public final class KafkaSingletons { private static final boolean PROPAGATION_ENABLED = InstrumentationConfig.get() .getBoolean("otel.instrumentation.kafka.client-propagation.enabled", true); + private static final boolean METRICS_ENABLED = + InstrumentationConfig.get() + .getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true); private static final Instrumenter, Void> PRODUCER_INSTRUMENTER; private static final Instrumenter, Void> CONSUMER_RECEIVE_INSTRUMENTER; @@ -56,5 +62,20 @@ public static boolean isPropagationEnabled() { return CONSUMER_PROCESS_INSTRUMENTER; } + public static void enhanceConfig(Map config) { + if (!METRICS_ENABLED) { + return; + } + config.merge( + CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + OpenTelemetryMetricsReporter.class.getName(), + (class1, class2) -> class1 + "," + class2); + config.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, GlobalOpenTelemetry.get()); + config.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, + INSTRUMENTATION_NAME); + } + private KafkaSingletons() {} } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/OpenTelemetryMetricsReporterTest.java new file mode 100644 index 000000000000..28cf833f1889 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/instrumentation/kafkaclients/OpenTelemetryMetricsReporterTest.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafkaclients; + +import static java.util.Collections.emptyMap; + +import io.opentelemetry.instrumentation.kafka.internal.AbstractOpenTelemetryMetricsReporterTest; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.util.Map; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; + +@EnabledIfSystemProperty( + named = "testLatestDeps", + matches = "true", + disabledReason = + "kafka-clients 0.11 emits a significantly different set of metrics; it's probably fine to just test the latest version") +class OpenTelemetryMetricsReporterTest extends AbstractOpenTelemetryMetricsReporterTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Override + protected InstrumentationExtension testing() { + return testing; + } + + @Override + protected Map additionalConfig() { + return emptyMap(); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/build.gradle.kts index b8dc4adc702b..3058fd208b2f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/build.gradle.kts @@ -10,4 +10,8 @@ dependencies { implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library")) implementation("org.testcontainers:kafka") + implementation("org.testcontainers:junit-jupiter") + + compileOnly("com.google.auto.value:auto-value-annotations") + annotationProcessor("com.google.auto.value:auto-value") } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java new file mode 100644 index 000000000000..e38f56ad86c3 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java @@ -0,0 +1,449 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.kafka.internal; + +import static java.lang.System.lineSeparator; +import static java.util.Comparator.comparing; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.auto.value.AutoValue; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +@SuppressWarnings("OtelInternalJavadoc") +@Testcontainers +public abstract class AbstractOpenTelemetryMetricsReporterTest { + + private static final Logger logger = + LoggerFactory.getLogger(AbstractOpenTelemetryMetricsReporterTest.class); + + private static final List TOPICS = Arrays.asList("foo", "bar", "baz", "qux"); + private static final Random RANDOM = new Random(); + + private static KafkaContainer kafka; + private static KafkaProducer producer; + private static KafkaConsumer consumer; + + @BeforeEach + void beforeAll() { + // only start the kafka container the first time this runs + if (kafka != null) { + return; + } + + kafka = + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) + .withStartupTimeout(Duration.ofMinutes(1)); + kafka.start(); + producer = new KafkaProducer<>(producerConfig()); + consumer = new KafkaConsumer<>(consumerConfig()); + } + + @AfterAll + static void afterAll() { + kafka.stop(); + producer.close(); + consumer.close(); + } + + @AfterEach + void tearDown() { + OpenTelemetryMetricsReporter.resetForTest(); + } + + protected abstract InstrumentationExtension testing(); + + protected abstract Map additionalConfig(); + + protected Map producerConfig() { + Map producerConfig = new HashMap<>(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-client-id"); + producerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + producerConfig.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); + producerConfig.putAll(additionalConfig()); + producerConfig.merge( + CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + TestMetricsReporter.class.getName(), + (o, o2) -> o + "," + o2); + return producerConfig; + } + + protected Map consumerConfig() { + Map consumerConfig = new HashMap<>(); + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group"); + consumerConfig.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfig.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); + consumerConfig.putAll(additionalConfig()); + consumerConfig.merge( + CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, + TestMetricsReporter.class.getName(), + (o, o2) -> o + "," + o2); + return consumerConfig; + } + + @Test + void observeMetrics() { + produceRecords(); + consumeRecords(); + + Set expectedMetricNames = + new HashSet<>( + Arrays.asList( + "kafka.consumer.commit_latency_avg", + "kafka.consumer.commit_latency_max", + "kafka.consumer.commit_rate", + "kafka.consumer.commit_total", + "kafka.consumer.failed_rebalance_rate_per_hour", + "kafka.consumer.failed_rebalance_total", + "kafka.consumer.heartbeat_rate", + "kafka.consumer.heartbeat_response_time_max", + "kafka.consumer.heartbeat_total", + "kafka.consumer.join_rate", + "kafka.consumer.join_time_avg", + "kafka.consumer.join_time_max", + "kafka.consumer.join_total", + "kafka.consumer.last_heartbeat_seconds_ago", + "kafka.consumer.last_rebalance_seconds_ago", + "kafka.consumer.partition_assigned_latency_avg", + "kafka.consumer.partition_assigned_latency_max", + "kafka.consumer.partition_lost_latency_avg", + "kafka.consumer.partition_lost_latency_max", + "kafka.consumer.partition_revoked_latency_avg", + "kafka.consumer.partition_revoked_latency_max", + "kafka.consumer.rebalance_latency_avg", + "kafka.consumer.rebalance_latency_max", + "kafka.consumer.rebalance_latency_total", + "kafka.consumer.rebalance_rate_per_hour", + "kafka.consumer.rebalance_total", + "kafka.consumer.sync_rate", + "kafka.consumer.sync_time_avg", + "kafka.consumer.sync_time_max", + "kafka.consumer.sync_total", + "kafka.consumer.bytes_consumed_rate", + "kafka.consumer.bytes_consumed_total", + "kafka.consumer.fetch_latency_avg", + "kafka.consumer.fetch_latency_max", + "kafka.consumer.fetch_rate", + "kafka.consumer.fetch_size_avg", + "kafka.consumer.fetch_size_max", + "kafka.consumer.fetch_throttle_time_avg", + "kafka.consumer.fetch_throttle_time_max", + "kafka.consumer.fetch_total", + "kafka.consumer.records_consumed_rate", + "kafka.consumer.records_consumed_total", + "kafka.consumer.records_lag", + "kafka.consumer.records_lag_avg", + "kafka.consumer.records_lag_max", + "kafka.consumer.records_lead", + "kafka.consumer.records_lead_avg", + "kafka.consumer.records_lead_min", + "kafka.consumer.records_per_request_avg", + "kafka.consumer.connection_close_rate", + "kafka.consumer.connection_close_total", + "kafka.consumer.connection_count", + "kafka.consumer.connection_creation_rate", + "kafka.consumer.connection_creation_total", + "kafka.consumer.failed_authentication_rate", + "kafka.consumer.failed_authentication_total", + "kafka.consumer.failed_reauthentication_rate", + "kafka.consumer.failed_reauthentication_total", + "kafka.consumer.incoming_byte_rate", + "kafka.consumer.incoming_byte_total", + "kafka.consumer.io_ratio", + "kafka.consumer.io_time_ns_avg", + "kafka.consumer.io_wait_ratio", + "kafka.consumer.io_wait_time_ns_avg", + "kafka.consumer.io_waittime_total", + "kafka.consumer.iotime_total", + "kafka.consumer.last_poll_seconds_ago", + "kafka.consumer.network_io_rate", + "kafka.consumer.network_io_total", + "kafka.consumer.outgoing_byte_rate", + "kafka.consumer.outgoing_byte_total", + "kafka.consumer.poll_idle_ratio_avg", + "kafka.consumer.reauthentication_latency_avg", + "kafka.consumer.reauthentication_latency_max", + "kafka.consumer.request_rate", + "kafka.consumer.request_size_avg", + "kafka.consumer.request_size_max", + "kafka.consumer.request_total", + "kafka.consumer.response_rate", + "kafka.consumer.response_total", + "kafka.consumer.select_rate", + "kafka.consumer.select_total", + "kafka.consumer.successful_authentication_no_reauth_total", + "kafka.consumer.successful_authentication_rate", + "kafka.consumer.successful_authentication_total", + "kafka.consumer.successful_reauthentication_rate", + "kafka.consumer.successful_reauthentication_total", + "kafka.consumer.time_between_poll_avg", + "kafka.consumer.time_between_poll_max", + "kafka.consumer.request_latency_avg", + "kafka.consumer.request_latency_max", + "kafka.producer.batch_size_avg", + "kafka.producer.batch_size_max", + "kafka.producer.batch_split_rate", + "kafka.producer.batch_split_total", + "kafka.producer.buffer_available_bytes", + "kafka.producer.buffer_exhausted_rate", + "kafka.producer.buffer_exhausted_total", + "kafka.producer.buffer_total_bytes", + "kafka.producer.bufferpool_wait_ratio", + "kafka.producer.bufferpool_wait_time_total", + "kafka.producer.compression_rate_avg", + "kafka.producer.connection_close_rate", + "kafka.producer.connection_close_total", + "kafka.producer.connection_count", + "kafka.producer.connection_creation_rate", + "kafka.producer.connection_creation_total", + "kafka.producer.failed_authentication_rate", + "kafka.producer.failed_authentication_total", + "kafka.producer.failed_reauthentication_rate", + "kafka.producer.failed_reauthentication_total", + "kafka.producer.incoming_byte_rate", + "kafka.producer.incoming_byte_total", + "kafka.producer.io_ratio", + "kafka.producer.io_time_ns_avg", + "kafka.producer.io_wait_ratio", + "kafka.producer.io_wait_time_ns_avg", + "kafka.producer.io_waittime_total", + "kafka.producer.iotime_total", + "kafka.producer.metadata_age", + "kafka.producer.network_io_rate", + "kafka.producer.network_io_total", + "kafka.producer.outgoing_byte_rate", + "kafka.producer.outgoing_byte_total", + "kafka.producer.produce_throttle_time_avg", + "kafka.producer.produce_throttle_time_max", + "kafka.producer.reauthentication_latency_avg", + "kafka.producer.reauthentication_latency_max", + "kafka.producer.record_error_rate", + "kafka.producer.record_error_total", + "kafka.producer.record_queue_time_avg", + "kafka.producer.record_queue_time_max", + "kafka.producer.record_retry_rate", + "kafka.producer.record_retry_total", + "kafka.producer.record_send_rate", + "kafka.producer.record_send_total", + "kafka.producer.record_size_avg", + "kafka.producer.record_size_max", + "kafka.producer.records_per_request_avg", + "kafka.producer.request_latency_avg", + "kafka.producer.request_latency_max", + "kafka.producer.request_rate", + "kafka.producer.request_size_avg", + "kafka.producer.request_size_max", + "kafka.producer.request_total", + "kafka.producer.requests_in_flight", + "kafka.producer.response_rate", + "kafka.producer.response_total", + "kafka.producer.select_rate", + "kafka.producer.select_total", + "kafka.producer.successful_authentication_no_reauth_total", + "kafka.producer.successful_authentication_rate", + "kafka.producer.successful_authentication_total", + "kafka.producer.successful_reauthentication_rate", + "kafka.producer.successful_reauthentication_total", + "kafka.producer.waiting_threads", + "kafka.producer.byte_rate", + "kafka.producer.byte_total", + "kafka.producer.compression_rate")); + + List metrics = testing().metrics(); + Set metricNames = metrics.stream().map(MetricData::getName).collect(toSet()); + assertThat(metricNames).containsAll(expectedMetricNames); + + assertThat(metrics) + .allSatisfy( + metricData -> { + Set expectedKeys = + metricData.getData().getPoints().stream() + .findFirst() + .map( + point -> + point.getAttributes().asMap().keySet().stream() + .map(AttributeKey::getKey) + .collect(toSet())) + .orElse(Collections.emptySet()); + assertThat(metricData.getData().getPoints()) + .extracting(PointData::getAttributes) + .extracting( + attributes -> + attributes.asMap().keySet().stream() + .map(AttributeKey::getKey) + .collect(toSet())) + .allSatisfy(attributeKeys -> assertThat(attributeKeys).isEqualTo(expectedKeys)); + }); + + // Print mapping table + printMappingTable(); + } + + private static void produceRecords() { + for (int i = 0; i < 100; i++) { + producer.send( + new ProducerRecord<>( + TOPICS.get(RANDOM.nextInt(TOPICS.size())), + 0, + System.currentTimeMillis(), + "key".getBytes(StandardCharsets.UTF_8), + "value".getBytes(StandardCharsets.UTF_8))); + } + } + + private static void consumeRecords() { + consumer.subscribe(TOPICS); + Instant stopTime = Instant.now().plusSeconds(10); + while (Instant.now().isBefore(stopTime)) { + consumer.poll(1_000); + } + } + + /** + * Print a table mapping kafka metrics to equivalent OpenTelemetry metrics, in markdown format. + */ + private static void printMappingTable() { + StringBuilder sb = new StringBuilder(); + // Append table headers + sb.append( + "| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |") + .append(lineSeparator()) + .append( + "|--------------|-------------|----------------|-----------------|------------------------|-----------------|") + .append(lineSeparator()); + Map> kafkaMetricsByGroup = + TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup)); + List registeredObservables = + OpenTelemetryMetricsReporter.getRegisteredObservables(); + // Iterate through groups in alpha order + for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) { + List kafkaMetricIds = + kafkaMetricsByGroup.get(group).stream() + .sorted( + comparing(KafkaMetricId::getName) + .thenComparing(kafkaMetricId -> kafkaMetricId.getAttributeKeys().size())) + .collect(toList()); + // Iterate through metrics in alpha order by name + for (KafkaMetricId kafkaMetricId : kafkaMetricIds) { + // Find first (there may be multiple) registered instrument that matches the kafkaMetricId + Optional descriptor = + registeredObservables.stream() + .filter( + registeredObservable -> + KafkaMetricId.create(registeredObservable.getKafkaMetricName()) + .equals(kafkaMetricId)) + .findFirst() + .map(RegisteredObservable::getInstrumentDescriptor); + // Append table row + sb.append( + String.format( + "| %s | %s | %s | %s | %s | %s |%n", + "`" + group + "`", + "`" + kafkaMetricId.getName() + "`", + kafkaMetricId.getAttributeKeys().stream() + .map(key -> "`" + key + "`") + .collect(joining(",")), + descriptor.map(i -> "`" + i.getName() + "`").orElse(""), + descriptor.map(InstrumentDescriptor::getDescription).orElse(""), + descriptor.map(i -> "`" + i.getInstrumentType() + "`").orElse(""))); + } + } + logger.info("Mapping table" + System.lineSeparator() + sb); + } + + /** + * This class is internal and is hence not for public use. Its APIs are unstable and can change at + * any time. + */ + public static class TestMetricsReporter implements MetricsReporter { + + private static final Set seenMetrics = new HashSet<>(); + + @Override + public void init(List list) { + list.forEach(this::metricChange); + } + + @Override + public void metricChange(KafkaMetric kafkaMetric) { + seenMetrics.add(KafkaMetricId.create(kafkaMetric.metricName())); + } + + @Override + public void metricRemoval(KafkaMetric kafkaMetric) {} + + @Override + public void close() {} + + @Override + public void configure(Map map) {} + } + + @AutoValue + abstract static class KafkaMetricId { + + abstract String getGroup(); + + abstract String getName(); + + abstract Set getAttributeKeys(); + + static KafkaMetricId create(MetricName metricName) { + return new AutoValue_AbstractOpenTelemetryMetricsReporterTest_KafkaMetricId( + metricName.group(), metricName.name(), metricName.tags().keySet()); + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java index 5230b4782171..1c9a7b1e16ea 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetry.java @@ -150,6 +150,9 @@ public Consumer wrap(Consumer consumer) { CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, OpenTelemetryMetricsReporter.class.getName()); config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry); + config.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, + KafkaTelemetryBuilder.INSTRUMENTATION_NAME); return Collections.unmodifiableMap(config); } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java index e16135f6ea8c..64e5e1bd6cd1 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTelemetryBuilder.java @@ -18,7 +18,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; public final class KafkaTelemetryBuilder { - private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-2.6"; + static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-2.6"; private final OpenTelemetry openTelemetry; private final List, Void>> producerAttributesExtractors = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java index 304eb5829524..578ab3824d75 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporterTest.java @@ -5,132 +5,30 @@ package io.opentelemetry.instrumentation.kafka.internal; -import static java.lang.System.lineSeparator; -import static java.util.Comparator.comparing; -import static java.util.stream.Collectors.groupingBy; -import static java.util.stream.Collectors.joining; -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import com.google.auto.value.AutoValue; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.data.PointData; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Random; -import java.util.Set; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.KafkaMetric; -import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.utility.DockerImageName; -@Testcontainers -class OpenTelemetryMetricsReporterTest { - - private static final Logger logger = - LoggerFactory.getLogger(OpenTelemetryMetricsReporterTest.class); - - private static final List TOPICS = Arrays.asList("foo", "bar", "baz", "qux"); - private static final Random RANDOM = new Random(); +class OpenTelemetryMetricsReporterTest extends AbstractOpenTelemetryMetricsReporterTest { @RegisterExtension static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); - private static KafkaContainer kafka; - private static KafkaProducer producer; - private static KafkaConsumer consumer; - - @BeforeAll - static void beforeAll() { - kafka = - new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3")) - .withLogConsumer(new Slf4jLogConsumer(logger)) - .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) - .withStartupTimeout(Duration.ofMinutes(1)); - kafka.start(); - producer = new KafkaProducer<>(producerConfig()); - consumer = new KafkaConsumer<>(consumerConfig()); - } - - @AfterAll - static void afterAll() { - kafka.stop(); - producer.close(); - consumer.close(); - } - - @AfterEach - void tearDown() { - OpenTelemetryMetricsReporter.resetForTest(); - } - - private static Map producerConfig() { - Map producerConfig = new HashMap<>(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-client-id"); - producerConfig.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - producerConfig.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); - producerConfig.putAll( - KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties()); - producerConfig.merge( - CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - TestMetricsReporter.class.getName(), - (o, o2) -> o + "," + o2); - return producerConfig; + @Override + protected InstrumentationExtension testing() { + return testing; } - private static Map consumerConfig() { - Map consumerConfig = new HashMap<>(); - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group"); - consumerConfig.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - consumerConfig.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2); - consumerConfig.putAll( - KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties()); - consumerConfig.merge( - CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - TestMetricsReporter.class.getName(), - (o, o2) -> o + "," + o2); - return consumerConfig; + @Override + protected Map additionalConfig() { + return KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties(); } @Test @@ -154,6 +52,26 @@ void badConfig() { .hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseMessage( "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.remove( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Missing required configuration property: opentelemetry.instrumentation_name"); + assertThatThrownBy( + () -> { + Map producerConfig = producerConfig(); + producerConfig.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, 42); + new KafkaProducer<>(producerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.instrumentation_name is not instance of String"); // Bad consumer config assertThatThrownBy( @@ -174,318 +92,25 @@ void badConfig() { .hasRootCauseInstanceOf(IllegalStateException.class) .hasRootCauseMessage( "Configuration property opentelemetry.instance is not instance of OpenTelemetry"); - } - - @Test - void observeMetrics() { - produceRecords(); - consumeRecords(); - - Set expectedMetricNames = - new HashSet<>( - Arrays.asList( - "kafka.consumer.commit_latency_avg", - "kafka.consumer.commit_latency_max", - "kafka.consumer.commit_rate", - "kafka.consumer.commit_total", - "kafka.consumer.failed_rebalance_rate_per_hour", - "kafka.consumer.failed_rebalance_total", - "kafka.consumer.heartbeat_rate", - "kafka.consumer.heartbeat_response_time_max", - "kafka.consumer.heartbeat_total", - "kafka.consumer.join_rate", - "kafka.consumer.join_time_avg", - "kafka.consumer.join_time_max", - "kafka.consumer.join_total", - "kafka.consumer.last_heartbeat_seconds_ago", - "kafka.consumer.last_rebalance_seconds_ago", - "kafka.consumer.partition_assigned_latency_avg", - "kafka.consumer.partition_assigned_latency_max", - "kafka.consumer.partition_lost_latency_avg", - "kafka.consumer.partition_lost_latency_max", - "kafka.consumer.partition_revoked_latency_avg", - "kafka.consumer.partition_revoked_latency_max", - "kafka.consumer.rebalance_latency_avg", - "kafka.consumer.rebalance_latency_max", - "kafka.consumer.rebalance_latency_total", - "kafka.consumer.rebalance_rate_per_hour", - "kafka.consumer.rebalance_total", - "kafka.consumer.sync_rate", - "kafka.consumer.sync_time_avg", - "kafka.consumer.sync_time_max", - "kafka.consumer.sync_total", - "kafka.consumer.bytes_consumed_rate", - "kafka.consumer.bytes_consumed_total", - "kafka.consumer.fetch_latency_avg", - "kafka.consumer.fetch_latency_max", - "kafka.consumer.fetch_rate", - "kafka.consumer.fetch_size_avg", - "kafka.consumer.fetch_size_max", - "kafka.consumer.fetch_throttle_time_avg", - "kafka.consumer.fetch_throttle_time_max", - "kafka.consumer.fetch_total", - "kafka.consumer.records_consumed_rate", - "kafka.consumer.records_consumed_total", - "kafka.consumer.records_lag", - "kafka.consumer.records_lag_avg", - "kafka.consumer.records_lag_max", - "kafka.consumer.records_lead", - "kafka.consumer.records_lead_avg", - "kafka.consumer.records_lead_min", - "kafka.consumer.records_per_request_avg", - "kafka.consumer.connection_close_rate", - "kafka.consumer.connection_close_total", - "kafka.consumer.connection_count", - "kafka.consumer.connection_creation_rate", - "kafka.consumer.connection_creation_total", - "kafka.consumer.failed_authentication_rate", - "kafka.consumer.failed_authentication_total", - "kafka.consumer.failed_reauthentication_rate", - "kafka.consumer.failed_reauthentication_total", - "kafka.consumer.incoming_byte_rate", - "kafka.consumer.incoming_byte_total", - "kafka.consumer.io_ratio", - "kafka.consumer.io_time_ns_avg", - "kafka.consumer.io_wait_ratio", - "kafka.consumer.io_wait_time_ns_avg", - "kafka.consumer.io_waittime_total", - "kafka.consumer.iotime_total", - "kafka.consumer.last_poll_seconds_ago", - "kafka.consumer.network_io_rate", - "kafka.consumer.network_io_total", - "kafka.consumer.outgoing_byte_rate", - "kafka.consumer.outgoing_byte_total", - "kafka.consumer.poll_idle_ratio_avg", - "kafka.consumer.reauthentication_latency_avg", - "kafka.consumer.reauthentication_latency_max", - "kafka.consumer.request_rate", - "kafka.consumer.request_size_avg", - "kafka.consumer.request_size_max", - "kafka.consumer.request_total", - "kafka.consumer.response_rate", - "kafka.consumer.response_total", - "kafka.consumer.select_rate", - "kafka.consumer.select_total", - "kafka.consumer.successful_authentication_no_reauth_total", - "kafka.consumer.successful_authentication_rate", - "kafka.consumer.successful_authentication_total", - "kafka.consumer.successful_reauthentication_rate", - "kafka.consumer.successful_reauthentication_total", - "kafka.consumer.time_between_poll_avg", - "kafka.consumer.time_between_poll_max", - "kafka.consumer.request_latency_avg", - "kafka.consumer.request_latency_max", - "kafka.producer.batch_size_avg", - "kafka.producer.batch_size_max", - "kafka.producer.batch_split_rate", - "kafka.producer.batch_split_total", - "kafka.producer.buffer_available_bytes", - "kafka.producer.buffer_exhausted_rate", - "kafka.producer.buffer_exhausted_total", - "kafka.producer.buffer_total_bytes", - "kafka.producer.bufferpool_wait_ratio", - "kafka.producer.bufferpool_wait_time_total", - "kafka.producer.compression_rate_avg", - "kafka.producer.connection_close_rate", - "kafka.producer.connection_close_total", - "kafka.producer.connection_count", - "kafka.producer.connection_creation_rate", - "kafka.producer.connection_creation_total", - "kafka.producer.failed_authentication_rate", - "kafka.producer.failed_authentication_total", - "kafka.producer.failed_reauthentication_rate", - "kafka.producer.failed_reauthentication_total", - "kafka.producer.incoming_byte_rate", - "kafka.producer.incoming_byte_total", - "kafka.producer.io_ratio", - "kafka.producer.io_time_ns_avg", - "kafka.producer.io_wait_ratio", - "kafka.producer.io_wait_time_ns_avg", - "kafka.producer.io_waittime_total", - "kafka.producer.iotime_total", - "kafka.producer.metadata_age", - "kafka.producer.network_io_rate", - "kafka.producer.network_io_total", - "kafka.producer.outgoing_byte_rate", - "kafka.producer.outgoing_byte_total", - "kafka.producer.produce_throttle_time_avg", - "kafka.producer.produce_throttle_time_max", - "kafka.producer.reauthentication_latency_avg", - "kafka.producer.reauthentication_latency_max", - "kafka.producer.record_error_rate", - "kafka.producer.record_error_total", - "kafka.producer.record_queue_time_avg", - "kafka.producer.record_queue_time_max", - "kafka.producer.record_retry_rate", - "kafka.producer.record_retry_total", - "kafka.producer.record_send_rate", - "kafka.producer.record_send_total", - "kafka.producer.record_size_avg", - "kafka.producer.record_size_max", - "kafka.producer.records_per_request_avg", - "kafka.producer.request_latency_avg", - "kafka.producer.request_latency_max", - "kafka.producer.request_rate", - "kafka.producer.request_size_avg", - "kafka.producer.request_size_max", - "kafka.producer.request_total", - "kafka.producer.requests_in_flight", - "kafka.producer.response_rate", - "kafka.producer.response_total", - "kafka.producer.select_rate", - "kafka.producer.select_total", - "kafka.producer.successful_authentication_no_reauth_total", - "kafka.producer.successful_authentication_rate", - "kafka.producer.successful_authentication_total", - "kafka.producer.successful_reauthentication_rate", - "kafka.producer.successful_reauthentication_total", - "kafka.producer.waiting_threads", - "kafka.producer.byte_rate", - "kafka.producer.byte_total", - "kafka.producer.compression_rate")); - - List metrics = testing.metrics(); - Set metricNames = metrics.stream().map(MetricData::getName).collect(toSet()); - assertThat(metricNames).containsAll(expectedMetricNames); - - assertThat(metrics) - .allSatisfy( - metricData -> { - Set expectedKeys = - metricData.getData().getPoints().stream() - .findFirst() - .map( - point -> - point.getAttributes().asMap().keySet().stream() - .map(AttributeKey::getKey) - .collect(toSet())) - .orElse(Collections.emptySet()); - assertThat(metricData.getData().getPoints()) - .extracting(PointData::getAttributes) - .extracting( - attributes -> - attributes.asMap().keySet().stream() - .map(AttributeKey::getKey) - .collect(toSet())) - .allSatisfy(attributeKeys -> assertThat(attributeKeys).isEqualTo(expectedKeys)); - }); - - // Print mapping table - printMappingTable(); - } - - private static void produceRecords() { - for (int i = 0; i < 100; i++) { - producer.send( - new ProducerRecord<>( - TOPICS.get(RANDOM.nextInt(TOPICS.size())), - 0, - System.currentTimeMillis(), - "key".getBytes(StandardCharsets.UTF_8), - "value".getBytes(StandardCharsets.UTF_8))); - } - } - - private static void consumeRecords() { - consumer.subscribe(TOPICS); - Instant stopTime = Instant.now().plusSeconds(10); - while (Instant.now().isBefore(stopTime)) { - consumer.poll(Duration.ofSeconds(1)); - } - } - - /** - * Print a table mapping kafka metrics to equivalent OpenTelemetry metrics, in markdown format. - */ - private static void printMappingTable() { - StringBuilder sb = new StringBuilder(); - // Append table headers - sb.append( - "| Metric Group | Metric Name | Attribute Keys | Instrument Name | Instrument Description | Instrument Type |") - .append(lineSeparator()) - .append( - "|--------------|-------------|----------------|-----------------|------------------------|-----------------|") - .append(lineSeparator()); - Map> kafkaMetricsByGroup = - TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup)); - List registeredObservables = - OpenTelemetryMetricsReporter.getRegisteredObservables(); - // Iterate through groups in alpha order - for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) { - List kafkaMetricIds = - kafkaMetricsByGroup.get(group).stream() - .sorted( - comparing(KafkaMetricId::getName) - .thenComparing(kafkaMetricId -> kafkaMetricId.getAttributeKeys().size())) - .collect(toList()); - // Iterate through metrics in alpha order by name - for (KafkaMetricId kafkaMetricId : kafkaMetricIds) { - // Find first (there may be multiple) registered instrument that matches the kafkaMetricId - Optional descriptor = - registeredObservables.stream() - .filter( - registeredObservable -> - KafkaMetricId.create(registeredObservable.getKafkaMetricName()) - .equals(kafkaMetricId)) - .findFirst() - .map(RegisteredObservable::getInstrumentDescriptor); - // Append table row - sb.append( - String.format( - "| %s | %s | %s | %s | %s | %s |%n", - "`" + group + "`", - "`" + kafkaMetricId.getName() + "`", - kafkaMetricId.getAttributeKeys().stream() - .map(key -> "`" + key + "`") - .collect(joining(",")), - descriptor.map(i -> "`" + i.getName() + "`").orElse(""), - descriptor.map(InstrumentDescriptor::getDescription).orElse(""), - descriptor.map(i -> "`" + i.getInstrumentType() + "`").orElse(""))); - } - } - logger.info("Mapping table" + System.lineSeparator() + sb); - } - - /** - * This class is internal and is hence not for public use. Its APIs are unstable and can change at - * any time. - */ - public static class TestMetricsReporter implements MetricsReporter { - - private static final Set seenMetrics = new HashSet<>(); - - @Override - public void init(List list) { - list.forEach(this::metricChange); - } - - @Override - public void metricChange(KafkaMetric kafkaMetric) { - seenMetrics.add(KafkaMetricId.create(kafkaMetric.metricName())); - } - - @Override - public void metricRemoval(KafkaMetric kafkaMetric) {} - - @Override - public void close() {} - - @Override - public void configure(Map map) {} - } - - @AutoValue - abstract static class KafkaMetricId { - - abstract String getGroup(); - - abstract String getName(); - - abstract Set getAttributeKeys(); - - static KafkaMetricId create(MetricName metricName) { - return new AutoValue_OpenTelemetryMetricsReporterTest_KafkaMetricId( - metricName.group(), metricName.name(), metricName.tags().keySet()); - } + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.remove( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Missing required configuration property: opentelemetry.instrumentation_name"); + assertThatThrownBy( + () -> { + Map consumerConfig = consumerConfig(); + consumerConfig.put( + OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, 42); + new KafkaConsumer<>(consumerConfig).close(); + }) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage( + "Configuration property opentelemetry.instrumentation_name is not instance of String"); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java index f340db874e03..4139e614b894 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java @@ -8,6 +8,8 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.MeterBuilder; +import io.opentelemetry.instrumentation.api.internal.EmbeddedInstrumentationProperties; import io.opentelemetry.instrumentation.api.internal.GuardedBy; import java.util.ArrayList; import java.util.Iterator; @@ -34,6 +36,8 @@ public final class OpenTelemetryMetricsReporter implements MetricsReporter { public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance"; + public static final String CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME = + "opentelemetry.instrumentation_name"; private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsReporter.class.getName()); @@ -146,17 +150,30 @@ private static void closeInstrument(AutoCloseable observable) { @Override public void configure(Map configs) { - Object openTelemetry = configs.get(CONFIG_KEY_OPENTELEMETRY_INSTANCE); - if (openTelemetry == null) { - throw new IllegalStateException( - "Missing required configuration property: " + CONFIG_KEY_OPENTELEMETRY_INSTANCE); + OpenTelemetry openTelemetry = + getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTANCE, OpenTelemetry.class); + String instrumentationName = + getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, String.class); + String instrumentationVersion = + EmbeddedInstrumentationProperties.findVersion(instrumentationName); + + MeterBuilder meterBuilder = openTelemetry.meterBuilder(instrumentationName); + if (instrumentationVersion != null) { + meterBuilder.setInstrumentationVersion(instrumentationVersion); + } + meter = meterBuilder.build(); + } + + @SuppressWarnings("unchecked") + private static T getProperty(Map configs, String key, Class requiredType) { + Object value = configs.get(key); + if (value == null) { + throw new IllegalStateException("Missing required configuration property: " + key); } - if (!(openTelemetry instanceof OpenTelemetry)) { + if (!requiredType.isInstance(value)) { throw new IllegalStateException( - "Configuration property " - + CONFIG_KEY_OPENTELEMETRY_INSTANCE - + " is not instance of OpenTelemetry"); + "Configuration property " + key + " is not instance of " + requiredType.getSimpleName()); } - meter = ((OpenTelemetry) openTelemetry).getMeter("io.opentelemetry.kafka-clients"); + return (T) value; } }