From eab438a9adb9d869484bddce3bc67bbc73e17ed6 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 4 Mar 2021 12:35:30 -0800 Subject: [PATCH] Add batch span processor benchmarks Description: This PR adds two benchmarks. 1. Current benchmark executs forceFlush() on every loop and creates a bottleneck which results in not stressing batch span processor. Current benchmark only measures throughput which is not helpful on its own since number of spans getting exported is also important. BatchSpanProcessorMultiThreadBenchmark is created to address this issue. 2. Measuring CPU usage of exporter thread is also important, but the current benchmarks consumes as much CPU as possible which makes the measurement not meaningful. To maintain a steady state, this PR creates a benchmark that generates 10k spans per second per thread. One would need to attach a profiler such as yourkit or JProfiler to the benchmark run to understand the processor's CPU usage. BatchSpanProcessorCpuBenchmark is created for this purpose. --- .../export/BatchSpanProcessorBenchmark.java | 35 ---- .../BatchSpanProcessorCpuBenchmark.java | 164 ++++++++++++++++++ ...tchSpanProcessorDroppedSpansBenchmark.java | 95 +++------- .../export/BatchSpanProcessorMetrics.java | 52 ++++++ ...atchSpanProcessorMultiThreadBenchmark.java | 158 +++++++++++++++++ .../trace/export/DelayingSpanExporter.java | 43 +++++ 6 files changed, 443 insertions(+), 104 deletions(-) create mode 100644 sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java create mode 100644 sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java create mode 100644 sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java create mode 100644 sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java index 03c7e073aa3..736f9612a74 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java @@ -8,14 +8,9 @@ import com.google.common.collect.ImmutableList; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -33,36 +28,6 @@ @State(Scope.Benchmark) public class BatchSpanProcessorBenchmark { - private static class DelayingSpanExporter implements SpanExporter { - - private final ScheduledExecutorService executor; - - private final int delayMs; - - private DelayingSpanExporter(int delayMs) { - executor = Executors.newScheduledThreadPool(5); - this.delayMs = delayMs; - } - - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - final CompletableResultCode result = new CompletableResultCode(); - executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); - return result; - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @Param({"0", "1", "5"}) private int delayMs; diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java new file mode 100644 index 00000000000..40e8a85221a --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java @@ -0,0 +1,164 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.metrics.GlobalMetricsProvider; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/* + * Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread. + */ +public class BatchSpanProcessorCpuBenchmark { + @State(Scope.Benchmark) + public static class BenchmarkState { + private SdkMeterProvider sdkMeterProvider; + private BatchSpanProcessor processor; + private Tracer tracer; + private int numThreads = 1; + + @Param({"1"}) + private int delayMs; + + private long exportedSpans; + private long droppedSpans; + + @Setup(Level.Iteration) + public final void setup() { + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + GlobalMetricsProvider.set(sdkMeterProvider); + SpanExporter exporter = new DelayingSpanExporter(delayMs); + processor = BatchSpanProcessor.builder(exporter).build(); + tracer = + SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); + } + + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); + } + + @TearDown(Level.Iteration) + public final void tearDown() { + processor.shutdown().join(10, TimeUnit.SECONDS); + } + } + + @State(Scope.Thread) + @AuxCounters(AuxCounters.Type.OPERATIONS) + public static class ThreadState { + BenchmarkState benchmarkState; + + @TearDown(Level.Iteration) + public final void recordMetrics(BenchmarkState benchmarkState) { + this.benchmarkState = benchmarkState; + } + + public long exportedSpans() { + return benchmarkState.exportedSpans; + } + + public long droppedSpans() { + return benchmarkState.droppedSpans; + } + } + + private static void doWork(BenchmarkState benchmarkState) { + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + // This sleep is essential to maintain a steady state of the benchmark run by generating 10k + // spans per second per thread. Without this JMH outer loop consumes as much CPU as possible + // making comparing different processor versions difficult. + LockSupport.parkNanos(100_000); + } + + @Benchmark + @Fork(1) + @Threads(1) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_01Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 1; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(2) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_02Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 2; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_05Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(10) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_10Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 10; + doWork(benchmarkState); + } + + @Benchmark + @Fork(1) + @Threads(20) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_20Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 20; + doWork(benchmarkState); + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java index 284595b2093..a9da31f28f7 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -5,16 +5,11 @@ package io.opentelemetry.sdk.trace.export; +import io.opentelemetry.api.metrics.GlobalMetricsProvider; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.data.LongPointData; -import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricProducer; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -31,100 +26,61 @@ public class BatchSpanProcessorDroppedSpansBenchmark { - private static class DelayingSpanExporter implements SpanExporter { - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @State(Scope.Benchmark) public static class BenchmarkState { - private final MetricProducer metricProducer = - SdkMeterProvider.builder().buildAndRegisterGlobal(); + private SdkMeterProvider sdkMeterProvider; private BatchSpanProcessor processor; private Tracer tracer; - private Collection allMetrics; + private double dropRatio; + private long exportedSpans; + private long droppedSpans; + private int numThreads; - @Setup(Level.Trial) + @Setup(Level.Iteration) public final void setup() { - SpanExporter exporter = new DelayingSpanExporter(); + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + GlobalMetricsProvider.set(sdkMeterProvider); + SpanExporter exporter = new DelayingSpanExporter(0); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().build().get("benchmarkTracer"); } - @TearDown(Level.Trial) - public final void tearDown() { - processor.shutdown(); + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + dropRatio = metrics.dropRatio(); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); } @TearDown(Level.Iteration) - public final void recordMetrics() { - allMetrics = metricProducer.collectAllMetrics(); + public final void tearDown() { + processor.shutdown(); } } @State(Scope.Thread) - @AuxCounters(AuxCounters.Type.EVENTS) + @AuxCounters(AuxCounters.Type.OPERATIONS) public static class ThreadState { - private Collection allMetrics; + BenchmarkState benchmarkState; @TearDown(Level.Iteration) public final void recordMetrics(BenchmarkState benchmarkState) { - allMetrics = benchmarkState.allMetrics; + this.benchmarkState = benchmarkState; } - /** Burn, checkstyle, burn. */ public double dropRatio() { - long exported = getMetric(true); - long dropped = getMetric(false); - long total = exported + dropped; - if (total == 0) { - return 0; - } else { - // Due to peculiarities of JMH reporting we have to divide this by the number of the - // concurrent threads running the actual benchmark. - return (double) dropped / total / 5; - } + return benchmarkState.dropRatio; } public long exportedSpans() { - return getMetric(true); + return benchmarkState.exportedSpans; } public long droppedSpans() { - return getMetric(false); - } - - private long getMetric(boolean dropped) { - String labelValue = String.valueOf(dropped); - for (MetricData metricData : allMetrics) { - if (metricData.getName().equals("processedSpans")) { - if (metricData.isEmpty()) { - return 0; - } else { - Collection points = metricData.getLongSumData().getPoints(); - for (LongPointData point : points) { - if (labelValue.equals(point.getLabels().get("dropped"))) { - return point.getValue(); - } - } - } - } - } - return 0; + return benchmarkState.droppedSpans; } } @@ -137,6 +93,7 @@ private long getMetric(boolean dropped) { @BenchmarkMode(Mode.Throughput) public void export( BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java new file mode 100644 index 00000000000..e990b44e12d --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMetrics.java @@ -0,0 +1,52 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import java.util.Collection; +import java.util.Optional; + +public class BatchSpanProcessorMetrics { + private final Collection allMetrics; + private final int numThreads; + + public BatchSpanProcessorMetrics(Collection allMetrics, int numThreads) { + this.allMetrics = allMetrics; + this.numThreads = numThreads; + } + + public double dropRatio() { + long exported = getMetric(true); + long dropped = getMetric(false); + long total = exported + dropped; + // Due to peculiarities of JMH reporting we have to divide this by the number of the + // concurrent threads running the actual benchmark. + return total == 0 ? 0 : (double) dropped / total / numThreads; + } + + public long exportedSpans() { + return getMetric(false) / numThreads; + } + + public long droppedSpans() { + return getMetric(true) / numThreads; + } + + private long getMetric(boolean dropped) { + String labelValue = String.valueOf(dropped); + Optional value = + allMetrics.stream() + .filter(metricData -> metricData.getName().equals("processedSpans")) + .filter(metricData -> !metricData.isEmpty()) + .map(metricData -> metricData.getLongSumData().getPoints()) + .flatMap(Collection::stream) + .filter(point -> labelValue.equals(point.getLabels().get("dropped"))) + .map(LongPointData::getValue) + .findFirst(); + return value.isPresent() ? value.get() : 0; + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java new file mode 100644 index 00000000000..4c256714804 --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java @@ -0,0 +1,158 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.api.metrics.GlobalMetricsProvider; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.trace.ReadableSpan; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +@State(Scope.Benchmark) +public class BatchSpanProcessorMultiThreadBenchmark { + + @State(Scope.Benchmark) + public static class BenchmarkState { + private SdkMeterProvider sdkMeterProvider; + private BatchSpanProcessor processor; + private Tracer tracer; + private int numThreads = 1; + + @Param({"0"}) + private int delayMs; + + private long exportedSpans; + private long droppedSpans; + + @Setup(Level.Iteration) + public final void setup() { + sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); + GlobalMetricsProvider.set(sdkMeterProvider); + SpanExporter exporter = new DelayingSpanExporter(delayMs); + processor = BatchSpanProcessor.builder(exporter).build(); + tracer = + SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); + } + + @TearDown(Level.Iteration) + public final void recordMetrics() { + BatchSpanProcessorMetrics metrics = + new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); + exportedSpans = metrics.exportedSpans(); + droppedSpans = metrics.droppedSpans(); + } + + @TearDown(Level.Trial) + public final void tearDown() { + processor.shutdown().join(10, TimeUnit.SECONDS); + } + } + + @State(Scope.Thread) + @AuxCounters(AuxCounters.Type.OPERATIONS) + public static class ThreadState { + BenchmarkState benchmarkState; + + @TearDown(Level.Iteration) + public final void recordMetrics(BenchmarkState benchmarkState) { + this.benchmarkState = benchmarkState; + } + + public long exportedSpans() { + return benchmarkState.exportedSpans; + } + + public long droppedSpans() { + return benchmarkState.droppedSpans; + } + } + + @Benchmark + @Fork(1) + @Threads(1) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_01Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 1; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(2) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_02Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 2; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(5) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_05Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 5; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(10) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_10Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 10; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(20) + @Warmup(iterations = 1, time = 1) + @Measurement(iterations = 5, time = 5) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.SECONDS) + public void export_20Thread( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.numThreads = 20; + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } +} diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java new file mode 100644 index 00000000000..ff0b2815959 --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DelayingSpanExporter implements SpanExporter { + + private final ScheduledExecutorService executor; + + private final int delayMs; + + public DelayingSpanExporter(int delayMs) { + executor = Executors.newScheduledThreadPool(5); + this.delayMs = delayMs; + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public CompletableResultCode export(Collection spans) { + final CompletableResultCode result = new CompletableResultCode(); + executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); + return result; + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } +}