diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java index 03c7e073aa3..41f60d027ef 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java @@ -33,13 +33,13 @@ @State(Scope.Benchmark) public class BatchSpanProcessorBenchmark { - private static class DelayingSpanExporter implements SpanExporter { + public static class DelayingSpanExporter implements SpanExporter { private final ScheduledExecutorService executor; private final int delayMs; - private DelayingSpanExporter(int delayMs) { + public DelayingSpanExporter(int delayMs) { executor = Executors.newScheduledThreadPool(5); this.delayMs = delayMs; } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCPUBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java similarity index 80% rename from sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCPUBenchmark.java rename to sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java index 3d99959f79e..280fb6bcc3c 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCPUBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java @@ -1,8 +1,17 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + package io.opentelemetry.sdk.trace.export; +import io.opentelemetry.api.metrics.GlobalMetricsProvider; import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -18,18 +27,17 @@ import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; /* * Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread. */ -public class BatchSpanProcessorCPUBenchmark { +public class BatchSpanProcessorCpuBenchmark { @State(Scope.Benchmark) public static class BenchmarkState { + private SdkMeterProvider sdkMeterProvider; private BatchSpanProcessor processor; - Tracer tracer; - int numThreads = 1; + private Tracer tracer; + private int numThreads = 1; @Param({"1"}) private int delayMs; @@ -37,24 +45,25 @@ public static class BenchmarkState { private long exportedSpans; private long droppedSpans; - @Setup(Level.Trial) + @Setup(Level.Iteration) public final void setup() { - SpanExporter exporter = - new BatchSpanProcessorMultiThreadBenchmark.DelayingSpanExporter(delayMs); + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + GlobalMetricsProvider.set(sdkMeterProvider); + SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(delayMs); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); } @TearDown(Level.Iteration) - public final void metrics() { - exportedSpans = processor.exportedSpans() / numThreads; - droppedSpans = processor.droppedSpans() / numThreads; - processor.resetExportedSpans(); - processor.resetDroppedSpans(); + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); } - @TearDown(Level.Trial) + @TearDown(Level.Iteration) public final void tearDown() { processor.shutdown().join(10, TimeUnit.SECONDS); } @@ -63,21 +72,19 @@ public final void tearDown() { @State(Scope.Thread) @AuxCounters(AuxCounters.Type.OPERATIONS) public static class ThreadState { - private long exportedSpans; - private long droppedSpans; + BenchmarkState benchmarkState; @TearDown(Level.Iteration) public final void recordMetrics(BenchmarkState benchmarkState) { - exportedSpans = benchmarkState.exportedSpans; - droppedSpans = benchmarkState.droppedSpans; + this.benchmarkState = benchmarkState; } public long exportedSpans() { - return exportedSpans; + return benchmarkState.exportedSpans; } public long droppedSpans() { - return droppedSpans; + return benchmarkState.droppedSpans; } } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java index 284595b2093..ef34fa03486 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -5,16 +5,11 @@ package io.opentelemetry.sdk.trace.export; +import io.opentelemetry.api.metrics.GlobalMetricsProvider; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.data.LongPointData; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricProducer; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -31,100 +26,61 @@ public class BatchSpanProcessorDroppedSpansBenchmark { - private static class DelayingSpanExporter implements SpanExporter { - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @State(Scope.Benchmark) public static class BenchmarkState { - private final MetricProducer metricProducer = - SdkMeterProvider.builder().buildAndRegisterGlobal(); + private SdkMeterProvider sdkMeterProvider; private BatchSpanProcessor processor; private Tracer tracer; - private Collection allMetrics; + private double dropRatio; + private long exportedSpans; + private long droppedSpans; + private int numThreads; - @Setup(Level.Trial) + @Setup(Level.Iteration) public final void setup() { - SpanExporter exporter = new DelayingSpanExporter(); + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + GlobalMetricsProvider.set(sdkMeterProvider); + SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(0); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().build().get("benchmarkTracer"); } - @TearDown(Level.Trial) - public final void tearDown() { - processor.shutdown(); + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + dropRatio = metrics.dropRatio(); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); } @TearDown(Level.Iteration) - public final void recordMetrics() { - allMetrics = metricProducer.collectAllMetrics(); + public final void tearDown() { + processor.shutdown(); } } @State(Scope.Thread) - @AuxCounters(AuxCounters.Type.EVENTS) + @AuxCounters(AuxCounters.Type.OPERATIONS) public static class ThreadState { - private Collection allMetrics; + BenchmarkState benchmarkState; @TearDown(Level.Iteration) public final void recordMetrics(BenchmarkState benchmarkState) { - allMetrics = benchmarkState.allMetrics; + this.benchmarkState = benchmarkState; } - /** Burn, checkstyle, burn. */ public double dropRatio() { - long exported = getMetric(true); - long dropped = getMetric(false); - long total = exported + dropped; - if (total == 0) { - return 0; - } else { - // Due to peculiarities of JMH reporting we have to divide this by the number of the - // concurrent threads running the actual benchmark. - return (double) dropped / total / 5; - } + return benchmarkState.dropRatio; } public long exportedSpans() { - return getMetric(true); + return benchmarkState.exportedSpans; } public long droppedSpans() { - return getMetric(false); - } - - private long getMetric(boolean dropped) { - String labelValue = String.valueOf(dropped); - for (MetricData metricData : allMetrics) { - if (metricData.getName().equals("processedSpans")) { - if (metricData.isEmpty()) { - return 0; - } else { - Collection points = metricData.getLongSumData().getPoints(); - for (LongPointData point : points) { - if (labelValue.equals(point.getLabels().get("dropped"))) { - return point.getValue(); - } - } - } - } - } - return 0; + return benchmarkState.droppedSpans; } } @@ -137,6 +93,7 @@ private long getMetric(boolean dropped) { @BenchmarkMode(Mode.Throughput) public void export( BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java new file mode 100644 index 00000000000..e990b44e12d --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; +import java.util.Optional; + +public class BatchSpanProcessorMetrics { + private final Collection allMetrics; + private final int numThreads; + + public BatchSpanProcessorMetrics(Collection allMetrics, int numThreads) { + this.allMetrics = allMetrics; + this.numThreads = numThreads; + } + + public double dropRatio() { + long exported = getMetric(true); + long dropped = getMetric(false); + long total = exported + dropped; + // Due to peculiarities of JMH reporting we have to divide this by the number of the + // concurrent threads running the actual benchmark. + return total == 0 ? 0 : (double) dropped / total / numThreads; + } + + public long exportedSpans() { + return getMetric(false) / numThreads; + } + + public long droppedSpans() { + return getMetric(true) / numThreads; + } + + private long getMetric(boolean dropped) { + String labelValue = String.valueOf(dropped); + Optional value = + allMetrics.stream() + .filter(metricData -> metricData.getName().equals("processedSpans")) + .filter(metricData -> !metricData.isEmpty()) + .map(metricData -> metricData.getLongSumData().getPoints()) + .flatMap(Collection::stream) + .filter(point -> labelValue.equals(point.getLabels().get("dropped"))) + .map(LongPointData::getValue) + .findFirst(); + return value.isPresent() ? value.get() : 0; + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java index c81f8916c53..033bb793626 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java @@ -5,14 +5,11 @@ package io.opentelemetry.sdk.trace.export; +import io.opentelemetry.api.metrics.GlobalMetricsProvider; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; @@ -33,41 +30,12 @@ @State(Scope.Benchmark) public class BatchSpanProcessorMultiThreadBenchmark { - public static class DelayingSpanExporter implements SpanExporter { - - private final ScheduledExecutorService executor; - - private final int delayMs; - - public DelayingSpanExporter(int delayMs) { - executor = Executors.newScheduledThreadPool(5); - this.delayMs = delayMs; - } - - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - final CompletableResultCode result = new CompletableResultCode(); - executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); - return result; - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @State(Scope.Benchmark) public static class BenchmarkState { + private SdkMeterProvider sdkMeterProvider; private BatchSpanProcessor processor; - Tracer tracer; - int numThreads = 1; + private Tracer tracer; + private int numThreads = 1; @Param({"0"}) private int delayMs; @@ -75,20 +43,22 @@ public static class BenchmarkState { private long exportedSpans; private long droppedSpans; - @Setup(Level.Trial) + @Setup(Level.Iteration) public final void setup() { - SpanExporter exporter = new DelayingSpanExporter(delayMs); + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + GlobalMetricsProvider.set(sdkMeterProvider); + SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(delayMs); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); } @TearDown(Level.Iteration) - public final void metrics() { - exportedSpans = processor.exportedSpans() / numThreads; - droppedSpans = processor.droppedSpans() / numThreads; - processor.resetExportedSpans(); - processor.resetDroppedSpans(); + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); } @TearDown(Level.Trial) @@ -100,21 +70,19 @@ public final void tearDown() { @State(Scope.Thread) @AuxCounters(AuxCounters.Type.OPERATIONS) public static class ThreadState { - private long exportedSpans; - private long droppedSpans; + BenchmarkState benchmarkState; @TearDown(Level.Iteration) public final void recordMetrics(BenchmarkState benchmarkState) { - exportedSpans = benchmarkState.exportedSpans; - droppedSpans = benchmarkState.droppedSpans; + this.benchmarkState = benchmarkState; } public long exportedSpans() { - return exportedSpans; + return benchmarkState.exportedSpans; } public long droppedSpans() { - return droppedSpans; + return benchmarkState.droppedSpans; } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index dff51add558..44ef27b449e 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -121,22 +121,6 @@ ArrayList getBatch() { return new ArrayList<>(worker.batch); } - long exportedSpans() { - return worker.exportedSpansCounter.get(); - } - - long droppedSpans() { - return worker.droppedSpansCounter.get(); - } - - void resetExportedSpans() { - worker.exportedSpansCounter.set(0); - } - - void resetDroppedSpans() { - worker.droppedSpansCounter.set(0); - } - // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export // the data. private static final class Worker implements Runnable { @@ -158,8 +142,6 @@ private static final class Worker implements Runnable { private volatile boolean continueWork = true; private final Collection batch; private final ReentrantLock lock; - public AtomicLong droppedSpansCounter = new AtomicLong(0); - public AtomicLong exportedSpansCounter = new AtomicLong(0); @GuardedBy("lock") private final Condition needExport; @@ -210,7 +192,6 @@ private Worker( private void addSpan(ReadableSpan span) { if (queueSize.get() >= maxQueueSize) { - droppedSpansCounter.incrementAndGet(); droppedSpans.add(1); } else { queue.offer(span); @@ -323,7 +304,6 @@ private void exportCurrentBatch() { final CompletableResultCode result = spanExporter.export(batch); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { - exportedSpansCounter.addAndGet(batch.size()); exportedSpans.add(batch.size()); } else { logger.log(Level.FINE, "Exporter failed");