From 4e949559ba2c11ca9428878c1a61d5f469a6c5d7 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 11 Mar 2021 09:22:58 -0800 Subject: [PATCH] Address comments --- .../export/BatchSpanProcessorBenchmark.java | 35 --------------- .../BatchSpanProcessorCpuBenchmark.java | 12 +++--- ...tchSpanProcessorDroppedSpansBenchmark.java | 2 +- ...atchSpanProcessorMultiThreadBenchmark.java | 2 +- .../trace/export/DelayingSpanExporter.java | 43 +++++++++++++++++++ .../sdk/trace/export/BatchSpanProcessor.java | 9 ++-- 6 files changed, 56 insertions(+), 47 deletions(-) create mode 100644 sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java index 41f60d027ef..736f9612a74 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBenchmark.java @@ -8,14 +8,9 @@ import com.google.common.collect.ImmutableList; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; -import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.Collection; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Fork; @@ -33,36 +28,6 @@ @State(Scope.Benchmark) public class BatchSpanProcessorBenchmark { - public static class DelayingSpanExporter implements SpanExporter { - - private final ScheduledExecutorService executor; - - private final int delayMs; - - public DelayingSpanExporter(int delayMs) { - executor = Executors.newScheduledThreadPool(5); - this.delayMs = delayMs; - } - - @SuppressWarnings("FutureReturnValueIgnored") - @Override - public CompletableResultCode export(Collection spans) { - final CompletableResultCode result = new CompletableResultCode(); - executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); - return result; - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - return CompletableResultCode.ofSuccess(); - } - } - @Param({"0", "1", "5"}) private int delayMs; diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java index 280fb6bcc3c..0c99ab9a34f 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorCpuBenchmark.java @@ -49,7 +49,7 @@ public static class BenchmarkState { public final void setup() { sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); GlobalMetricsProvider.set(sdkMeterProvider); - SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(delayMs); + SpanExporter exporter = new DelayingSpanExporter(delayMs); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); @@ -100,7 +100,7 @@ public void export_01Thread( benchmarkState.numThreads = 1; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); - LockSupport.parkNanos(1000_00); + LockSupport.parkNanos(100_000); } @Benchmark @@ -115,7 +115,7 @@ public void export_02Thread( benchmarkState.numThreads = 2; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); - LockSupport.parkNanos(1000_00); + LockSupport.parkNanos(100_000); } @Benchmark @@ -130,7 +130,7 @@ public void export_05Thread( benchmarkState.numThreads = 5; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); - LockSupport.parkNanos(1000_00); + LockSupport.parkNanos(100_000); } @Benchmark @@ -145,7 +145,7 @@ public void export_10Thread( benchmarkState.numThreads = 10; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); - LockSupport.parkNanos(1000_00); + LockSupport.parkNanos(100_000); } @Benchmark @@ -160,6 +160,6 @@ public void export_20Thread( benchmarkState.numThreads = 20; benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); - LockSupport.parkNanos(1000_00); + LockSupport.parkNanos(100_000); } } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java index ef34fa03486..a9da31f28f7 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -40,7 +40,7 @@ public static class BenchmarkState { public final void setup() { sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); GlobalMetricsProvider.set(sdkMeterProvider); - SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(0); + SpanExporter exporter = new DelayingSpanExporter(0); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().build().get("benchmarkTracer"); diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java index 033bb793626..4c256714804 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java @@ -47,7 +47,7 @@ public static class BenchmarkState { public final void setup() { sdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal(); GlobalMetricsProvider.set(sdkMeterProvider); - SpanExporter exporter = new BatchSpanProcessorBenchmark.DelayingSpanExporter(delayMs); + SpanExporter exporter = new DelayingSpanExporter(delayMs); processor = BatchSpanProcessor.builder(exporter).build(); tracer = SdkTracerProvider.builder().addSpanProcessor(processor).build().get("benchmarkTracer"); diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java new file mode 100644 index 00000000000..ff0b2815959 --- /dev/null +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.export; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.Collection; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class DelayingSpanExporter implements SpanExporter { + + private final ScheduledExecutorService executor; + + private final int delayMs; + + public DelayingSpanExporter(int delayMs) { + executor = Executors.newScheduledThreadPool(5); + this.delayMs = delayMs; + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public CompletableResultCode export(Collection spans) { + final CompletableResultCode result = new CompletableResultCode(); + executor.schedule((Runnable) result::succeed, delayMs, TimeUnit.MILLISECONDS); + return result; + } + + @Override + public CompletableResultCode flush() { + return CompletableResultCode.ofSuccess(); + } + + @Override + public CompletableResultCode shutdown() { + return CompletableResultCode.ofSuccess(); + } +} diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 44ef27b449e..e017a6006fc 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -18,7 +18,7 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -140,7 +140,7 @@ private static final class Worker implements Runnable { private final AtomicLong addedSpansCounter = new AtomicLong(0); private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; - private final Collection batch; + private final ArrayList batch; private final ReentrantLock lock; @GuardedBy("lock") @@ -215,8 +215,8 @@ public void run() { flush(); } while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { - queueSize.decrementAndGet(); batch.add(queue.poll().toSpanData()); + queueSize.decrementAndGet(); } if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); @@ -301,7 +301,8 @@ private void exportCurrentBatch() { } try { - final CompletableResultCode result = spanExporter.export(batch); + final CompletableResultCode result = + spanExporter.export(Collections.unmodifiableList(batch)); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { exportedSpans.add(batch.size());