diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java index 03c7e073aa3..736f9612a74 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java @@ -8,14 +8,9 @@ import com.google.common.collect.ImmutableList; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -33,36 +28,6 @@ @State(Scope.Benchmark) public class BatchSpanProcessorBenchmark { - private static class DelayingSpanExporter implements SpanExporter { - - private final ScheduledExecutorService executor; - - private final int delayMs; - - private DelayingSpanExporter(int delayMs) { - executor = Executors.newScheduledThreadPool(5); - this.delayMs = delayMs; - } - - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - final CompletableResultCode result = new CompletableResultCode(); - executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); - return result; - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @Param({"0", "1", "5"}) private int delayMs; diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java new file mode 100644 index 00000000000..f18b7fb24bf --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java @@ -0,0 +1,162 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/* + * Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread. + */ +public class BatchSpanProcessorCpuBenchmark { + @State(Scope.Benchmark) + public static class BenchmarkState { + private SdkMeterProvider sdkMeterProvider; + private BatchSpanProcessor processor; + private Tracer tracer; + private int numThreads = 1; + + @Param({"1"}) + private int delayMs; + + private long exportedSpans; + private long droppedSpans; + + @Setup(Level.Iteration) + public final void setup() { + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + SpanExporter exporter = new DelayingSpanExporter(delayMs); + processor = BatchSpanProcessor.builder(exporter).build(); + tracer = + SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); + } + + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); + } + + @TearDown(Level.Iteration) + public final void tearDown() { + processor.shutdown().join(10, TimeUnit.SECONDS); + } + } + + @State(Scope.Thread) + @AuxCounters(AuxCounters.Type.OPERATIONS) + public static class ThreadState { + BenchmarkState benchmarkState; + + @TearDown(Level.Iteration) + public final void recordMetrics(BenchmarkState benchmarkState) { + this.benchmarkState = benchmarkState; + } + + public long exportedSpans() { + return benchmarkState.exportedSpans; + } + + public long droppedSpans() { + return benchmarkState.droppedSpans; + } + } + + private static void doWork(BenchmarkState benchmarkState) { + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + // This sleep is essential to maintain a steady state of the benchmark run by generating 10k + // spans per second per thread. Without this JMH outer loop consumes as much CPU as possible + // making comparing different processor versions difficult. + LockSupport.parkNanos(100_000); + } + + @Benchmark + @Fork(1) + @Threads(1) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_01Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 1; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(2) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_02Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 2; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_05Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(10) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_10Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 10; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(20) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_20Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 20; + doWork(benchmarkState); + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java index 284595b2093..ef0775dd1f8 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -6,15 +6,9 @@ package io.opentelemetry.sdk.trace.export; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.data.LongPointData; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricProducer; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -31,100 +25,60 @@ public class BatchSpanProcessorDroppedSpansBenchmark { - private static class DelayingSpanExporter implements SpanExporter { - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @State(Scope.Benchmark) public static class BenchmarkState { - private final MetricProducer metricProducer = - SdkMeterProvider.builder().buildAndRegisterGlobal(); + private SdkMeterProvider sdkMeterProvider; private BatchSpanProcessor processor; private Tracer tracer; - private Collection allMetrics; + private double dropRatio; + private long exportedSpans; + private long droppedSpans; + private int numThreads; - @Setup(Level.Trial) + @Setup(Level.Iteration) public final void setup() { - SpanExporter exporter = new DelayingSpanExporter(); + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + SpanExporter exporter = new DelayingSpanExporter(0); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().build().get("benchmarkTracer"); } - @TearDown(Level.Trial) - public final void tearDown() { - processor.shutdown(); + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + dropRatio = metrics.dropRatio(); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); } @TearDown(Level.Iteration) - public final void recordMetrics() { - allMetrics = metricProducer.collectAllMetrics(); + public final void tearDown() { + processor.shutdown(); } } @State(Scope.Thread) - @AuxCounters(AuxCounters.Type.EVENTS) + @AuxCounters(AuxCounters.Type.OPERATIONS) public static class ThreadState { - private Collection allMetrics; + BenchmarkState benchmarkState; @TearDown(Level.Iteration) public final void recordMetrics(BenchmarkState benchmarkState) { - allMetrics = benchmarkState.allMetrics; + this.benchmarkState = benchmarkState; } - /** Burn, checkstyle, burn. */ public double dropRatio() { - long exported = getMetric(true); - long dropped = getMetric(false); - long total = exported + dropped; - if (total == 0) { - return 0; - } else { - // Due to peculiarities of JMH reporting we have to divide this by the number of the - // concurrent threads running the actual benchmark. - return (double) dropped / total / 5; - } + return benchmarkState.dropRatio; } public long exportedSpans() { - return getMetric(true); + return benchmarkState.exportedSpans; } public long droppedSpans() { - return getMetric(false); - } - - private long getMetric(boolean dropped) { - String labelValue = String.valueOf(dropped); - for (MetricData metricData : allMetrics) { - if (metricData.getName().equals("processedSpans")) { - if (metricData.isEmpty()) { - return 0; - } else { - Collection points = metricData.getLongSumData().getPoints(); - for (LongPointData point : points) { - if (labelValue.equals(point.getLabels().get("dropped"))) { - return point.getValue(); - } - } - } - } - } - return 0; + return benchmarkState.droppedSpans; } } @@ -137,6 +91,7 @@ private long getMetric(boolean dropped) { @BenchmarkMode(Mode.Throughput) public void export( BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java new file mode 100644 index 00000000000..21030b17e4e --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; +import java.util.OptionalLong; + +public class BatchSpanProcessorMetrics { + private final Collection allMetrics; + private final int numThreads; + + public BatchSpanProcessorMetrics(Collection allMetrics, int numThreads) { + this.allMetrics = allMetrics; + this.numThreads = numThreads; + } + + public double dropRatio() { + long exported = getMetric(false); + long dropped = getMetric(true); + long total = exported + dropped; + // Due to peculiarities of JMH reporting we have to divide this by the number of the + // concurrent threads running the actual benchmark. + return total == 0 ? 0 : (double) dropped / total / numThreads; + } + + public long exportedSpans() { + return getMetric(false) / numThreads; + } + + public long droppedSpans() { + return getMetric(true) / numThreads; + } + + private long getMetric(boolean dropped) { + String labelValue = String.valueOf(dropped); + OptionalLong value = + allMetrics + .stream() + .filter(metricData -> metricData.getName().equals("processedSpans")) + .filter(metricData -> !metricData.isEmpty()) + .map(metricData -> metricData.getLongSumData().getPoints()) + .flatMap(Collection::stream) + .filter(point -> labelValue.equals(point.getLabels().get("dropped"))) + .mapToLong(LongPointData::getValue) + .findFirst(); + return value.isPresent() ? value.getAsLong() : 0; + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java new file mode 100644 index 00000000000..69fa730fe75 --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java @@ -0,0 +1,156 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class BatchSpanProcessorMultiThreadBenchmark { + + @State(Scope.Benchmark) + public static class BenchmarkState { + private SdkMeterProvider sdkMeterProvider; + private BatchSpanProcessor processor; + private Tracer tracer; + private int numThreads = 1; + + @Param({"0"}) + private int delayMs; + + private long exportedSpans; + private long droppedSpans; + + @Setup(Level.Iteration) + public final void setup() { + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + SpanExporter exporter = new DelayingSpanExporter(delayMs); + processor = BatchSpanProcessor.builder(exporter).build(); + tracer = + SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); + } + + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); + } + + @TearDown(Level.Trial) + public final void tearDown() { + processor.shutdown().join(10, TimeUnit.SECONDS); + } + } + + @State(Scope.Thread) + @AuxCounters(AuxCounters.Type.OPERATIONS) + public static class ThreadState { + BenchmarkState benchmarkState; + + @TearDown(Level.Iteration) + public final void recordMetrics(BenchmarkState benchmarkState) { + this.benchmarkState = benchmarkState; + } + + public long exportedSpans() { + return benchmarkState.exportedSpans; + } + + public long droppedSpans() { + return benchmarkState.droppedSpans; + } + } + + @Benchmark + @Fork(1) + @Threads(1) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_01Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 1; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(2) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_02Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 2; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_05Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(10) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_10Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 10; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(20) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_20Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 20; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java new file mode 100644 index 00000000000..ff0b2815959 --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DelayingSpanExporter implements SpanExporter { + + private final ScheduledExecutorService executor; + + private final int delayMs; + + public DelayingSpanExporter(int delayMs) { + executor = Executors.newScheduledThreadPool(5); + this.delayMs = delayMs; + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public CompletableResultCode export(Collection spans) { + final CompletableResultCode result = new CompletableResultCode(); + executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); + return result; + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } +}