Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch span processor benchmarks #3017

Merged
merged 1 commit into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -33,36 +28,6 @@
@State(Scope.Benchmark)
public class BatchSpanProcessorBenchmark {

private static class DelayingSpanExporter implements SpanExporter {

private final ScheduledExecutorService executor;

private final int delayMs;

private DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS);
return result;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}

@Param({"0", "1", "5"})
private int delayMs;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.export;

import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

/*
* Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread.
*/
public class BatchSpanProcessorCpuBenchmark {
@State(Scope.Benchmark)
public static class BenchmarkState {
private SdkMeterProvider sdkMeterProvider;
private BatchSpanProcessor processor;
private Tracer tracer;
private int numThreads = 1;

@Param({"1"})
private int delayMs;

private long exportedSpans;
private long droppedSpans;

@Setup(Level.Iteration)
public final void setup() {
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
SpanExporter exporter = new DelayingSpanExporter(delayMs);
processor = BatchSpanProcessor.builder(exporter).build();
tracer =
SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer");
}

@TearDown(Level.Iteration)
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Iteration)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.OPERATIONS)
public static class ThreadState {
BenchmarkState benchmarkState;

@TearDown(Level.Iteration)
public final void recordMetrics(BenchmarkState benchmarkState) {
this.benchmarkState = benchmarkState;
}

public long exportedSpans() {
return benchmarkState.exportedSpans;
}

public long droppedSpans() {
return benchmarkState.droppedSpans;
}
}

private static void doWork(BenchmarkState benchmarkState) {
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
// This sleep is essential to maintain a steady state of the benchmark run by generating 10k
sbandadd marked this conversation as resolved.
Show resolved Hide resolved
// spans per second per thread. Without this JMH outer loop consumes as much CPU as possible
// making comparing different processor versions difficult.
// Note that time spent outside of the sleep is negligible allowing this sleep to control
// span generation rate. Here we get 1 / 100_000 = 10K spans generated per second.
LockSupport.parkNanos(100_000);
}

@Benchmark
@Fork(1)
@Threads(1)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_01Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 1;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(2)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_02Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 2;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_05Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(10)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_10Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 10;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(20)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_20Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 20;
doWork(benchmarkState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,9 @@
package io.opentelemetry.sdk.trace.export;

import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -31,100 +25,60 @@

public class BatchSpanProcessorDroppedSpansBenchmark {

private static class DelayingSpanExporter implements SpanExporter {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}

@State(Scope.Benchmark)
public static class BenchmarkState {
private final MetricProducer metricProducer =
SdkMeterProvider.builder().buildAndRegisterGlobal();
private SdkMeterProvider sdkMeterProvider;
private BatchSpanProcessor processor;
private Tracer tracer;
private Collection<MetricData> allMetrics;
private double dropRatio;
private long exportedSpans;
private long droppedSpans;
private int numThreads;

@Setup(Level.Trial)
@Setup(Level.Iteration)
public final void setup() {
SpanExporter exporter = new DelayingSpanExporter();
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
SpanExporter exporter = new DelayingSpanExporter(0);
processor = BatchSpanProcessor.builder(exporter).build();

tracer = SdkTracerProvider.builder().build().get("benchmarkTracer");
}

@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown();
@TearDown(Level.Iteration)
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
dropRatio = metrics.dropRatio();
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Iteration)
public final void recordMetrics() {
allMetrics = metricProducer.collectAllMetrics();
public final void tearDown() {
processor.shutdown();
}
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.EVENTS)
@AuxCounters(AuxCounters.Type.OPERATIONS)
public static class ThreadState {
private Collection<MetricData> allMetrics;
BenchmarkState benchmarkState;

@TearDown(Level.Iteration)
public final void recordMetrics(BenchmarkState benchmarkState) {
allMetrics = benchmarkState.allMetrics;
this.benchmarkState = benchmarkState;
}

/** Burn, checkstyle, burn. */
public double dropRatio() {
long exported = getMetric(true);
long dropped = getMetric(false);
long total = exported + dropped;
if (total == 0) {
return 0;
} else {
// Due to peculiarities of JMH reporting we have to divide this by the number of the
// concurrent threads running the actual benchmark.
return (double) dropped / total / 5;
}
return benchmarkState.dropRatio;
}

public long exportedSpans() {
return getMetric(true);
return benchmarkState.exportedSpans;
}

public long droppedSpans() {
return getMetric(false);
}

private long getMetric(boolean dropped) {
String labelValue = String.valueOf(dropped);
for (MetricData metricData : allMetrics) {
if (metricData.getName().equals("processedSpans")) {
if (metricData.isEmpty()) {
return 0;
} else {
Collection<LongPointData> points = metricData.getLongSumData().getPoints();
for (LongPointData point : points) {
if (labelValue.equals(point.getLabels().get("dropped"))) {
return point.getValue();
}
}
}
}
}
return 0;
return benchmarkState.droppedSpans;
}
}

Expand All @@ -137,6 +91,7 @@ private long getMetric(boolean dropped) {
@BenchmarkMode(Mode.Throughput)
public void export(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand Down
Loading