Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sbandadd committed Mar 11, 2021
1 parent 81cda49 commit 4e94955
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -33,36 +28,6 @@
@State(Scope.Benchmark)
public class BatchSpanProcessorBenchmark {

public static class DelayingSpanExporter implements SpanExporter {

private final ScheduledExecutorService executor;

private final int delayMs;

public DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS);
return result;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}

@Param({"0", "1", "5"})
private int delayMs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static class BenchmarkState {
public final void setup() {
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
GlobalMetricsProvider.set(sdkMeterProvider);
SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(delayMs);
SpanExporter exporter = new DelayingSpanExporter(delayMs);
processor = BatchSpanProcessor.builder(exporter).build();
tracer =
SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer");
Expand Down Expand Up @@ -100,7 +100,7 @@ public void export_01Thread(
benchmarkState.numThreads = 1;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
LockSupport.parkNanos(1000_00);
LockSupport.parkNanos(100_000);
}

@Benchmark
Expand All @@ -115,7 +115,7 @@ public void export_02Thread(
benchmarkState.numThreads = 2;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
LockSupport.parkNanos(1000_00);
LockSupport.parkNanos(100_000);
}

@Benchmark
Expand All @@ -130,7 +130,7 @@ public void export_05Thread(
benchmarkState.numThreads = 5;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
LockSupport.parkNanos(1000_00);
LockSupport.parkNanos(100_000);
}

@Benchmark
Expand All @@ -145,7 +145,7 @@ public void export_10Thread(
benchmarkState.numThreads = 10;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
LockSupport.parkNanos(1000_00);
LockSupport.parkNanos(100_000);
}

@Benchmark
Expand All @@ -160,6 +160,6 @@ public void export_20Thread(
benchmarkState.numThreads = 20;
benchmarkState.processor.onEnd(
(ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan());
LockSupport.parkNanos(1000_00);
LockSupport.parkNanos(100_000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static class BenchmarkState {
public final void setup() {
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
GlobalMetricsProvider.set(sdkMeterProvider);
SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(0);
SpanExporter exporter = new DelayingSpanExporter(0);
processor = BatchSpanProcessor.builder(exporter).build();

tracer = SdkTracerProvider.builder().build().get("benchmarkTracer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static class BenchmarkState {
public final void setup() {
sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal();
GlobalMetricsProvider.set(sdkMeterProvider);
SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(delayMs);
SpanExporter exporter = new DelayingSpanExporter(delayMs);
processor = BatchSpanProcessor.builder(exporter).build();
tracer =
SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.export;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.Collection;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DelayingSpanExporter implements SpanExporter {

private final ScheduledExecutorService executor;

private final int delayMs;

public DelayingSpanExporter(int delayMs) {
executor = Executors.newScheduledThreadPool(5);
this.delayMs = delayMs;
}

@SuppressWarnings("FutureReturnValueIgnored")
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
final CompletableResultCode result = new CompletableResultCode();
executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS);
return result;
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -140,7 +140,7 @@ private static final class Worker implements Runnable {
private final AtomicLong addedSpansCounter = new AtomicLong(0);
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final Collection<SpanData> batch;
private final ArrayList<SpanData> batch;
private final ReentrantLock lock;

@GuardedBy("lock")
Expand Down Expand Up @@ -215,8 +215,8 @@ public void run() {
flush();
}
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
queueSize.decrementAndGet();
batch.add(queue.poll().toSpanData());
queueSize.decrementAndGet();
}
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
Expand Down Expand Up @@ -301,7 +301,8 @@ private void exportCurrentBatch() {
}

try {
final CompletableResultCode result = spanExporter.export(batch);
final CompletableResultCode result =
spanExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
exportedSpans.add(batch.size());
Expand Down

0 comments on commit 4e94955

Please sign in to comment.