Skip to content

Commit

Permalink
Add batch span processor benchmarks
Browse files Browse the repository at this point in the history
Description:
This PR adds two benchmarks.

1. Current benchmark executs forceFlush() on every loop and creates a bottleneck which results in not stressing batch span processor. Current benchmark only
measures throughput which is not helpful on its own since number of spans getting exported is also important. BatchSpanProcessorMultiThreadBenchmark is created to address this issue.
2. Measuring CPU usage of exporter thread is also important, but the current benchmarks consumes as much CPU as possible which makes the measurement not meaningful.
To maintain a steady state, this PR creates a benchmark that generates 10k spans per second per thread. One would need to attach a profiler such as yourkit or JProfiler
to the benchmark run to understand the processor's CPU usage. BatchSpanProcessorCpuBenchmark is created for this purpose.
  • Loading branch information
sbandadd committed Mar 11, 2021
1 parent 0d687bd commit eab438a
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -33,36 +28,6 @@
@State(Scope.Benchmark)
public class BatchSpanProcessorBenchmark {

private static class DelayingSpanExporter implements SpanExporter {

private final ScheduledExecutorService executor;

private final int delayMs;

private DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS);
return result;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}

@Param({"0", "1", "5"})
private int delayMs;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.export;

import io.opentelemetry.api.metrics.GlobalMetricsProvider;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

/*
* Run this along with a profiler to measure the CPU usage of BatchSpanProcessor's exporter thread.
*/
public class BatchSpanProcessorCpuBenchmark {
@State(Scope.Benchmark)
public static class BenchmarkState {
private SdkMeterProvider sdkMeterProvider;
private BatchSpanProcessor processor;
private Tracer tracer;
private int numThreads = 1;

@Param({"1"})
private int delayMs;

private long exportedSpans;
private long droppedSpans;

@Setup(Level.Iteration)
public final void setup() {
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
GlobalMetricsProvider.set(sdkMeterProvider);
SpanExporter exporter = new DelayingSpanExporter(delayMs);
processor = BatchSpanProcessor.builder(exporter).build();
tracer =
SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer");
}

@TearDown(Level.Iteration)
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Iteration)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.OPERATIONS)
public static class ThreadState {
BenchmarkState benchmarkState;

@TearDown(Level.Iteration)
public final void recordMetrics(BenchmarkState benchmarkState) {
this.benchmarkState = benchmarkState;
}

public long exportedSpans() {
return benchmarkState.exportedSpans;
}

public long droppedSpans() {
return benchmarkState.droppedSpans;
}
}

private static void doWork(BenchmarkState benchmarkState) {
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
// This sleep is essential to maintain a steady state of the benchmark run by generating 10k
// spans per second per thread. Without this JMH outer loop consumes as much CPU as possible
// making comparing different processor versions difficult.
LockSupport.parkNanos(100_000);
}

@Benchmark
@Fork(1)
@Threads(1)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_01Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 1;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(2)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_02Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 2;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(5)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_05Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(10)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_10Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 10;
doWork(benchmarkState);
}

@Benchmark
@Fork(1)
@Threads(20)
@Warmup(iterations = 1, time = 1)
@Measurement(iterations = 5, time = 5)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
public void export_20Thread(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 20;
doWork(benchmarkState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,11 @@

package io.opentelemetry.sdk.trace.export;

import io.opentelemetry.api.metrics.GlobalMetricsProvider;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricProducer;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand All @@ -31,100 +26,61 @@

public class BatchSpanProcessorDroppedSpansBenchmark {

private static class DelayingSpanExporter implements SpanExporter {
@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}

@State(Scope.Benchmark)
public static class BenchmarkState {
private final MetricProducer metricProducer =
SdkMeterProvider.builder().buildAndRegisterGlobal();
private SdkMeterProvider sdkMeterProvider;
private BatchSpanProcessor processor;
private Tracer tracer;
private Collection<MetricData> allMetrics;
private double dropRatio;
private long exportedSpans;
private long droppedSpans;
private int numThreads;

@Setup(Level.Trial)
@Setup(Level.Iteration)
public final void setup() {
SpanExporter exporter = new DelayingSpanExporter();
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
GlobalMetricsProvider.set(sdkMeterProvider);
SpanExporter exporter = new DelayingSpanExporter(0);
processor = BatchSpanProcessor.builder(exporter).build();

tracer = SdkTracerProvider.builder().build().get("benchmarkTracer");
}

@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown();
@TearDown(Level.Iteration)
public final void recordMetrics() {
BatchSpanProcessorMetrics metrics =
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
dropRatio = metrics.dropRatio();
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Iteration)
public final void recordMetrics() {
allMetrics = metricProducer.collectAllMetrics();
public final void tearDown() {
processor.shutdown();
}
}

@State(Scope.Thread)
@AuxCounters(AuxCounters.Type.EVENTS)
@AuxCounters(AuxCounters.Type.OPERATIONS)
public static class ThreadState {
private Collection<MetricData> allMetrics;
BenchmarkState benchmarkState;

@TearDown(Level.Iteration)
public final void recordMetrics(BenchmarkState benchmarkState) {
allMetrics = benchmarkState.allMetrics;
this.benchmarkState = benchmarkState;
}

/** Burn, checkstyle, burn. */
public double dropRatio() {
long exported = getMetric(true);
long dropped = getMetric(false);
long total = exported + dropped;
if (total == 0) {
return 0;
} else {
// Due to peculiarities of JMH reporting we have to divide this by the number of the
// concurrent threads running the actual benchmark.
return (double) dropped / total / 5;
}
return benchmarkState.dropRatio;
}

public long exportedSpans() {
return getMetric(true);
return benchmarkState.exportedSpans;
}

public long droppedSpans() {
return getMetric(false);
}

private long getMetric(boolean dropped) {
String labelValue = String.valueOf(dropped);
for (MetricData metricData : allMetrics) {
if (metricData.getName().equals("processedSpans")) {
if (metricData.isEmpty()) {
return 0;
} else {
Collection<LongPointData> points = metricData.getLongSumData().getPoints();
for (LongPointData point : points) {
if (labelValue.equals(point.getLabels().get("dropped"))) {
return point.getValue();
}
}
}
}
}
return 0;
return benchmarkState.droppedSpans;
}
}

Expand All @@ -137,6 +93,7 @@ private long getMetric(boolean dropped) {
@BenchmarkMode(Mode.Throughput)
public void export(
BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) {
benchmarkState.numThreads = 5;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
}
Expand Down
Loading

0 comments on commit eab438a

Please sign in to comment.