Skip to content

Commit

Permalink
Add OTel Metrics plus refactor for generic senders
Browse files Browse the repository at this point in the history
  • Loading branch information
brunobat committed Mar 13, 2024
1 parent 9d46911 commit 079d8d0
Show file tree
Hide file tree
Showing 45 changed files with 1,593 additions and 669 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ AdditionalBeanBuildItem ensureProducerIsRetained() {
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.ResourceCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.SamplerCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TracerProviderCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.MetricProviderCustomizer.class,
AutoConfiguredOpenTelemetrySdkBuilderCustomizer.TextMapPropagatorCustomizers.class)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.jboss.jandex.ParameterizedType;
import org.jboss.jandex.Type;

import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
Expand All @@ -26,9 +29,9 @@
import io.quarkus.opentelemetry.runtime.config.build.exporter.OtlpExporterBuildConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.OTelRuntimeConfig;
import io.quarkus.opentelemetry.runtime.config.runtime.exporter.OtlpExporterRuntimeConfig;
import io.quarkus.opentelemetry.runtime.exporter.otlp.EndUserSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.OTelExporterRecorder;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.EndUserSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.runtime.TlsConfig;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;

Expand Down Expand Up @@ -84,4 +87,32 @@ void createBatchSpanProcessor(OTelExporterRecorder recorder,
vertxBuildItem.getVertx()))
.done());
}

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
void createMetricsExporterProcessor(
OTelExporterRecorder recorder,
List<ExternalOtelExporterBuildItem> externalOtelExporterBuildItem,
OTelRuntimeConfig otelRuntimeConfig,
OtlpExporterRuntimeConfig exporterRuntimeConfig,
TlsConfig tlsConfig,
CoreVertxBuildItem vertxBuildItem,
BuildProducer<SyntheticBeanBuildItem> syntheticBeanBuildItemBuildProducer) {
if (!externalOtelExporterBuildItem.isEmpty()) {
// if there is an external exporter, we don't want to create the default one
return;
}

syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem
.configure(PeriodicMetricReader.class)// FIXME concrete class?
.types(MetricReader.class)
.setRuntimeInit()
.scope(Singleton.class)
.unremovable()
.addInjectionPoint(ParameterizedType.create(DotName.createSimple(Instance.class),
new Type[] { ClassType.create(DotName.createSimple(MetricExporter.class.getName())) }, null))
.createWith(recorder.createMetricReader(otelRuntimeConfig, exporterRuntimeConfig, tlsConfig,
vertxBuildItem.getVertx()))
.done());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.quarkus.opentelemetry.deployment.metric;

import java.util.function.BooleanSupplier;
import java.util.function.Function;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.BuildSteps;
import io.quarkus.opentelemetry.runtime.config.build.OTelBuildConfig;
import io.quarkus.opentelemetry.runtime.metrics.cdi.MetricsProducer;

@BuildSteps(onlyIf = MetricProcessor.MetricEnabled.class)
public class MetricProcessor {

@BuildStep
void ensureProducersAreRetained(
BuildProducer<AdditionalBeanBuildItem> additionalBeans) {

additionalBeans.produce(AdditionalBeanBuildItem.builder()
.setUnremovable()
.addBeanClass(MetricsProducer.class)
.build());
}

public static class MetricEnabled implements BooleanSupplier {
OTelBuildConfig otelBuildConfig;

public boolean getAsBoolean() {
return otelBuildConfig.metrics().enabled()
.map(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean enabled) {
return otelBuildConfig.enabled() && enabled;
}
})
.orElseGet(() -> otelBuildConfig.enabled());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ public class TracerEnabled implements BooleanSupplier {
OTelBuildConfig otelConfig;

public boolean getAsBoolean() {
return otelConfig.traces().enabled().map(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean tracerEnabled) {
return otelConfig.enabled() && tracerEnabled;
}
})
return otelConfig.traces().enabled()
.map(new Function<Boolean, Boolean>() {
@Override
public Boolean apply(Boolean tracerEnabled) {
return otelConfig.enabled() && tracerEnabled;
}
})
.orElseGet(() -> otelConfig.enabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.OpenTelemetry;
import io.quarkus.opentelemetry.runtime.exporter.otlp.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.test.QuarkusUnitTest;

@Disabled("Not implemented")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package io.quarkus.opentelemetry.deployment.common;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

import jakarta.enterprise.context.ApplicationScoped;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.semconv.SemanticAttributes;
import io.quarkus.arc.Unremovable;

@Unremovable
@ApplicationScoped
public class InMemoryMetricExporter implements MetricExporter {

private static final List<String> LEGACY_KEY_COMPONENTS = List.of(SemanticAttributes.HTTP_METHOD.getKey(),
SemanticAttributes.HTTP_ROUTE.getKey(),
SemanticAttributes.HTTP_STATUS_CODE.getKey());
private static final List<String> KEY_COMPONENTS = List.of(SemanticAttributes.HTTP_REQUEST_METHOD.getKey(),
SemanticAttributes.HTTP_ROUTE.getKey(),
SemanticAttributes.HTTP_RESPONSE_STATUS_CODE.getKey());

private final Queue<MetricData> finishedMetricItems = new ConcurrentLinkedQueue<>();
private final AggregationTemporality aggregationTemporality = AggregationTemporality.CUMULATIVE;
private boolean isStopped = false;

public static Map<String, String> getPointAttributes(final MetricData metricData, final String path) {
try {
return metricData.getData().getPoints().stream()
.filter(point -> isPathFound(path, point.getAttributes()))
.map(point -> point.getAttributes())
.map(attributes1 -> attributes1.asMap())
.flatMap(map -> map.entrySet().stream())
.collect(toMap(map -> map.getKey().toString(), map -> map.getValue().toString()));
} catch (Exception e) {
System.out.println("Error getting point attributes for " + metricData.getName());
metricData.getData().getPoints().stream()
.filter(point -> isPathFound(path, point.getAttributes()))
.map(point -> point.getAttributes())
.map(attributes1 -> attributes1.asMap())
.flatMap(map -> map.entrySet().stream())
.forEach(attributeKeyObjectEntry -> System.out
.println(attributeKeyObjectEntry.getKey() + " " + attributeKeyObjectEntry.getValue()));
throw e;
}
}

public static Map<String, PointData> getMostRecentPointsMap(List<MetricData> finishedMetricItems) {
return finishedMetricItems.stream()
.flatMap(metricData -> metricData.getData().getPoints().stream())
// exclude data from /export endpoint
.filter(InMemoryMetricExporter::notExporterPointData)
// newer first
.sorted(Comparator.comparingLong(PointData::getEpochNanos).reversed())
.collect(toMap(
pointData -> pointData.getAttributes().asMap().entrySet().stream()
//valid attributes for the resulting map key
.filter(entry -> {
if (SemconvStability.emitOldHttpSemconv()) {
return LEGACY_KEY_COMPONENTS.contains(entry.getKey().getKey());
} else {
return KEY_COMPONENTS.contains(entry.getKey().getKey());
}
})
// ensure order
.sorted(Comparator.comparing(o -> o.getKey().getKey()))
// build key
.map(entry -> entry.getKey().getKey() + ":" + entry.getValue().toString())
.collect(joining(",")),
pointData -> pointData,
// most recent points will surface
(older, newer) -> newer));
}

/*
* ignore points with /export in the route
*/
private static boolean notExporterPointData(PointData pointData) {
return pointData.getAttributes().asMap().entrySet().stream()
.noneMatch(entry -> entry.getKey().getKey().equals(SemanticAttributes.HTTP_ROUTE.getKey()) &&
entry.getValue().toString().contains("/export"));
}

private static boolean isPathFound(String path, Attributes attributes) {
if (path == null) {
return true;// any match
}
Object value = attributes.asMap().get(AttributeKey.stringKey(SemanticAttributes.HTTP_ROUTE.getKey()));
if (value == null) {
return false;
}
return value.toString().equals(path);
}

public void assertCount(final int count) {
Awaitility.await().atMost(5, SECONDS)
.untilAsserted(() -> Assertions.assertEquals(count, getFinishedMetricItems().size()));
}

public void assertCount(final String name, final String target, final int count) {
Awaitility.await().atMost(5, SECONDS)
.untilAsserted(() -> Assertions.assertEquals(count, getFinishedMetricItems(name, target).size()));
}

public void assertCountAtLeast(final int count) {
Awaitility.await().atMost(5, SECONDS)
.untilAsserted(() -> Assertions.assertTrue(count < getFinishedMetricItems().size()));
}

public void assertCountAtLeast(final String name, final String target, final int count) {
Awaitility.await().atMost(5, SECONDS)
.untilAsserted(() -> Assertions.assertTrue(count < getFinishedMetricItems(name, target).size()));
}

/**
* Returns a {@code List} of the finished {@code Metric}s, represented by {@code MetricData}.
*
* @return a {@code List} of the finished {@code Metric}s.
*/
public List<MetricData> getFinishedMetricItems() {
return Collections.unmodifiableList(new ArrayList<>(finishedMetricItems));
}

public List<MetricData> getFinishedMetricItems(final String name, final String target) {
return Collections.unmodifiableList(new ArrayList<>(
finishedMetricItems.stream()
.filter(metricData -> metricData.getName().equals(name))
.filter(metricData -> metricData.getData().getPoints().stream()
.anyMatch(point -> isPathFound(target, point.getAttributes())))
.collect(Collectors.toList())));
}

/**
* Clears the internal {@code List} of finished {@code Metric}s.
*
* <p>
* Does not reset the state of this exporter if already shutdown.
*/
public void reset() {
finishedMetricItems.clear();
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporality;
}

/**
* Exports the collection of {@code Metric}s into the inmemory queue.
*
* <p>
* If this is called after {@code shutdown}, this will return {@code ResultCode.FAILURE}.
*/
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isStopped) {
return CompletableResultCode.ofFailure();
}
finishedMetricItems.addAll(metrics);
return CompletableResultCode.ofSuccess();
}

/**
* The InMemory exporter does not batch metrics, so this method will immediately return with
* success.
*
* @return always Success
*/
@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

/**
* Clears the internal {@code List} of finished {@code Metric}s.
*
* <p>
* Any subsequent call to export() function on this MetricExporter, will return {@code
* CompletableResultCode.ofFailure()}
*/
@Override
public CompletableResultCode shutdown() {
isStopped = true;
finishedMetricItems.clear();
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.opentelemetry.deployment.common;

import jakarta.enterprise.inject.spi.CDI;

import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider;
import io.opentelemetry.sdk.metrics.export.MetricExporter;

public class InMemoryMetricExporterProvider implements ConfigurableMetricExporterProvider {
@Override
public MetricExporter createExporter(ConfigProperties configProperties) {
return CDI.current().select(InMemoryMetricExporter.class).get();
}

@Override
public String getName() {
return "in-memory";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import io.opentelemetry.api.OpenTelemetry;
import io.quarkus.opentelemetry.runtime.exporter.otlp.LateBoundBatchSpanProcessor;
import io.quarkus.opentelemetry.runtime.exporter.otlp.tracing.LateBoundBatchSpanProcessor;
import io.quarkus.test.QuarkusUnitTest;

public class OtlpExporterDisabledTest {
Expand Down
Loading

0 comments on commit 079d8d0

Please sign in to comment.