Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kafka client metrics #6138

Merged
merged 17 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,4 @@ dependencies {
implementation(project(":instrumentation:kafka:kafka-clients:kafka-clients-common:library"))

implementation("org.testcontainers:kafka")
implementation("org.testcontainers:junit-jupiter")

compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ dependencies {
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.10.2")

testImplementation("org.testcontainers:kafka")
testImplementation("org.testcontainers:junit-jupiter")

testCompileOnly("com.google.auto.value:auto-value-annotations")
testAnnotationProcessor("com.google.auto.value:auto-value")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.function.BiFunction;
import java.util.logging.Logger;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MetricsReporter;

public final class KafkaTelemetry {
private static final Logger logger = Logger.getLogger(KafkaTelemetry.class.getName());
Expand Down Expand Up @@ -76,6 +84,41 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
return new TracingConsumer<>(consumer, this);
}

/**
* Produces a set of kafka client config properties (consumer or producer) to register a {@link
* MetricsReporter} that records metrics to an {@code openTelemetry} instance. Add these resulting
* properties to the configuration map used to initialize a {@link KafkaConsumer} or {@link
* KafkaProducer}.
*
* <p>For producers:
*
* <pre>{@code
* // Map<String, Object> config = new HashMap<>();
* // config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(kafkaTelemetry.metricConfigProperties());
* // try (KafkaProducer<?, ?> producer = new KafkaProducer<>(config)) { ... }
* }</pre>
*
* <p>For consumers:
*
* <pre>{@code
* // Map<String, Object> config = new HashMap<>();
* // config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
* // config.putAll(kafkaTelemetry.metricConfigProperties());
* // try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(config)) { ... }
* }</pre>
*
* @return the kafka client properties
*/
public Map<String, ?> metricConfigProperties() {
Map<String, Object> config = new HashMap<>();
config.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName());
config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry);
return Collections.unmodifiableMap(config);
}

/**
* Build and inject span into record.
*
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.internal;
package io.opentelemetry.instrumentation.kafka.internal;

import static java.lang.System.lineSeparator;
import static java.util.Comparator.comparing;
Expand All @@ -16,7 +16,7 @@

import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.kafkaclients.OpenTelemetryKafkaMetrics;
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.sdk.metrics.data.MetricData;
Expand Down Expand Up @@ -57,12 +57,8 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
@Testcontainers
public abstract class OpenTelemetryMetricsReporterTest {
class OpenTelemetryMetricsReporterTest {

private static final Logger logger =
LoggerFactory.getLogger(OpenTelemetryMetricsReporterTest.class);
Expand Down Expand Up @@ -111,7 +107,11 @@ private static Map<String, Object> producerConfig() {
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
producerConfig.putAll(
OpenTelemetryKafkaMetrics.getConfigProperties(testing.getOpenTelemetry()));
KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties());
producerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
(o, o2) -> o + "," + o2);
return producerConfig;
}

Expand All @@ -125,7 +125,7 @@ private static Map<String, Object> consumerConfig() {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
consumerConfig.putAll(
OpenTelemetryKafkaMetrics.getConfigProperties(testing.getOpenTelemetry()));
KafkaTelemetry.create(testing.getOpenTelemetry()).metricConfigProperties());
consumerConfig.merge(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
TestMetricsReporter.class.getName(),
Expand Down Expand Up @@ -418,7 +418,7 @@ private static void consumeRecords() {
consumer.subscribe(TOPICS);
Instant stopTime = Instant.now().plusSeconds(10);
while (Instant.now().isBefore(stopTime)) {
consumer.poll(1000);
consumer.poll(Duration.ofSeconds(1));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,15 @@
The Kafka client exposes metrics via `org.apache.kafka.common.metrics.MetricsReporter` interface.
OpenTelemetry provides an implementation that bridges the metrics into OpenTelemetry.

To use, configure `GlobalOpenTelemetry` with an OpenTelemetry instance
via `GlobalOpenTelemetry#set(OpenTelemetry)`, and include a reference to this
class in kafka producer or consumer configuration, i.e.:

```java
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, OpenTelemetryKafkaMetrics.class.getName());
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getKafkaConnectString());
...
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(config)) { ... }
```
To use, add merge the config properties
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
from `KafkaTelemetry.create(OpenTelemetry).metricConfigProperties()`
with the configuration used when creating your producer or consumer.

Note: Kafka reports several metrics at multiple attribute granularities. For
example, `records-consumed-total` is reported with attribute key `[client-id]`
and `[client-id, topic]`. If you analyze the sum of records consumed, ignoring dimensions, backends
are likely to double count. To alleviate this, `OpenTelemetryKafkaMetrics` detects this
scenario and only records the most granular set of attributes available. In the case
are likely to double count. The implementation detects this scenario and only records the most
granular set of attributes available. In the case
of `records-consumed-total`, it reports `[client-id, topic]` and ignores `[client-id]`.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a commit with new logic that does what's described in this comment. I think its a fairly important improvement.


The following table shows the full set of metrics exposed by the kafka client, and the corresponding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.internal;
package io.opentelemetry.instrumentation.kafka.internal;

import com.google.auto.value.AutoValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.internal;
package io.opentelemetry.instrumentation.kafka.internal;

import static io.opentelemetry.instrumentation.kafkaclients.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER;
import static io.opentelemetry.instrumentation.kafkaclients.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE;
import static io.opentelemetry.instrumentation.kafka.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_COUNTER;
import static io.opentelemetry.instrumentation.kafka.internal.InstrumentDescriptor.INSTRUMENT_TYPE_DOUBLE_OBSERVABLE_GAUGE;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
Expand All @@ -26,7 +26,7 @@
import org.apache.kafka.common.metrics.Measurable;

/** A registry mapping kafka metrics to corresponding OpenTelemetry metric definitions. */
class KafkaMetricRegistry {
final class KafkaMetricRegistry {

private static final Set<String> groups = new HashSet<>(Arrays.asList("consumer", "producer"));
private static final Map<Class<?>, String> measureableToInstrumentType = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.internal;
package io.opentelemetry.instrumentation.kafka.internal;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.GuardedBy;
import io.opentelemetry.instrumentation.kafkaclients.OpenTelemetryKafkaMetrics;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -23,12 +22,16 @@
/**
* A {@link MetricsReporter} which bridges Kafka metrics to OpenTelemetry metrics.
*
* <p>To configure, use {@link OpenTelemetryKafkaMetrics#getConfigProperties(OpenTelemetry)}.
* <p>To configure, use:
*
* <pre><{@code
* // KafkaTelemetry.KafkaTelemetry.create(OpenTelemetry).metricConfigProperties()
* }</pre>
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class OpenTelemetryMetricsReporter implements MetricsReporter {
public final class OpenTelemetryMetricsReporter implements MetricsReporter {

public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients.internal;
package io.opentelemetry.instrumentation.kafka.internal;

import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Attributes;
Expand Down

This file was deleted.