From 9997dee68ed29d09e4891befbbd12bd05377f434 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Wed, 3 Mar 2021 13:42:33 +0900 Subject: [PATCH 1/7] Don't use singleton in resource test (#2970) --- .../sdk/extension/resources/ProcessRuntimeResourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk-extensions/resources/src/test/java/io/opentelemetry/sdk/extension/resources/ProcessRuntimeResourceTest.java b/sdk-extensions/resources/src/test/java/io/opentelemetry/sdk/extension/resources/ProcessRuntimeResourceTest.java index 74806773eae..7f0e656a253 100644 --- a/sdk-extensions/resources/src/test/java/io/opentelemetry/sdk/extension/resources/ProcessRuntimeResourceTest.java +++ b/sdk-extensions/resources/src/test/java/io/opentelemetry/sdk/extension/resources/ProcessRuntimeResourceTest.java @@ -19,7 +19,7 @@ class ProcessRuntimeResourceTest { @Test void shouldCreateRuntimeAttributes() { // when - Attributes attributes = ProcessRuntimeResource.get().getAttributes(); + Attributes attributes = ProcessRuntimeResource.buildResource().getAttributes(); // then assertThat(attributes.get(ResourceAttributes.PROCESS_RUNTIME_NAME)).isNotBlank(); From 62cfc060d398254646f0fe22fa0f52e517e73818 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Thu, 4 Mar 2021 00:52:07 +0900 Subject: [PATCH 2/7] Keep reference to leaked scope to make sure GC detection doesn't kick in first. (#2972) --- .../io/opentelemetry/context/StrictContextStorageTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/context/src/strictContextEnabledTest/java/io/opentelemetry/context/StrictContextStorageTest.java b/context/src/strictContextEnabledTest/java/io/opentelemetry/context/StrictContextStorageTest.java index b885edf8af4..0ad903a74fe 100644 --- a/context/src/strictContextEnabledTest/java/io/opentelemetry/context/StrictContextStorageTest.java +++ b/context/src/strictContextEnabledTest/java/io/opentelemetry/context/StrictContextStorageTest.java @@ -141,7 +141,8 @@ void scope_close_onWrongThread(Supplier method, String methodName) throws @SuppressWarnings("ReturnValueIgnored") void decorator_close_withLeakedScope(Supplier method, String methodName) throws Exception { - Thread thread = new Thread(method::get); + AtomicReference scope = new AtomicReference<>(); + Thread thread = new Thread(() -> scope.set(method.get())); thread.setName("t1"); thread.start(); thread.join(); From 47c96b5dd9af237f1e96f82293c32b986163641d Mon Sep 17 00:00:00 2001 From: jason plumb <75337021+breedx-splk@users.noreply.github.com> Date: Wed, 3 Mar 2021 10:55:41 -0800 Subject: [PATCH 3/7] Fix trace benchmark (#2974) * fix address passed to span processor, which was causing benchmark to not run. * started some docs on how to run jmh --- docs/jmh.md | 21 +++++++++++++++++++ .../sdk/trace/SpanPipelineBenchmark.java | 17 +++++++++++++-- 2 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 docs/jmh.md diff --git a/docs/jmh.md b/docs/jmh.md new file mode 100644 index 00000000000..fed02923d3a --- /dev/null +++ b/docs/jmh.md @@ -0,0 +1,21 @@ + +# how to jmh + +[jmh] (Java Benchmark Harness) is a tool for running benchmarks and reporting results. + +opentelemetry-java has a lot of micro benchmarks. They live inside +`jmh` directories in the appropriate module. + +The benchmarks are run with a gradle plugin. + +To run an entire suite for a module, you can run the jmh gradle task. +As an example, here's how you can run the benchmarks for all of +the sdk trace module. + +``` +`./gradlew :sdk:trace:jmh` +``` + +If you just want to run a single benchmark and not the entire suite: + +`./gradlew -PjmhIncludeSingleClass=BatchSpanProcessorBenchmark :sdk:trace:jmh` \ No newline at end of file diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java index 4f471a3fce8..551c59c2550 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanPipelineBenchmark.java @@ -11,6 +11,8 @@ import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; +import java.net.MalformedURLException; +import java.net.URL; import java.time.Duration; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; @@ -66,18 +68,29 @@ public void setup() { collector.start(); - String address = collector.getHost() + ":" + collector.getMappedPort(EXPOSED_PORT); + SpanProcessor spanProcessor = makeSpanProcessor(collector); SdkTracerProvider tracerProvider = SdkTracerProvider.builder() .setSampler(Sampler.alwaysOn()) - .addSpanProcessor(getSpanProcessor(address)) + .addSpanProcessor(spanProcessor) .build(); Tracer tracerSdk = tracerProvider.get("PipelineBenchmarkTracer"); sdkSpanBuilder = (SdkSpanBuilder) tracerSdk.spanBuilder("PipelineBenchmarkSpan"); } + private SpanProcessor makeSpanProcessor(GenericContainer collector) { + try { + String host = collector.getHost(); + Integer port = collector.getMappedPort(EXPOSED_PORT); + String address = new URL("http", host, port, "").toString(); + return getSpanProcessor(address); + } catch (MalformedURLException e) { + throw new IllegalStateException("can't make a url", e); + } + } + @Benchmark @BenchmarkMode(Mode.Throughput) @Warmup(iterations = 5, time = 1) From 3b4f4cf26c53737913c6b6e67ab02ac80d219c01 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Wed, 3 Mar 2021 21:50:55 -0800 Subject: [PATCH 4/7] Dropped batch microbenchmark --- .../sdk/trace/SpanBenchmark.java | 8 ++-- ...tchSpanProcessorDroppedSpansBenchmark.java | 43 +++++++++++++++++-- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanBenchmark.java index c8063e1bb07..bcdbf8c062f 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/SpanBenchmark.java @@ -51,7 +51,7 @@ public final void setup() { @Threads(value = 1) @Fork(1) @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) + @Measurement(iterations = 50, time = 1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void simpleSpanStartAddEventEnd_01Thread() { doSpanWork(); @@ -61,7 +61,7 @@ public void simpleSpanStartAddEventEnd_01Thread() { @Threads(value = 5) @Fork(1) @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) + @Measurement(iterations = 50, time = 1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void simpleSpanStartAddEventEnd_05Threads() { doSpanWork(); @@ -71,7 +71,7 @@ public void simpleSpanStartAddEventEnd_05Threads() { @Threads(value = 2) @Fork(1) @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) + @Measurement(iterations = 50, time = 1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void simpleSpanStartAddEventEnd_02Threads() { doSpanWork(); @@ -81,7 +81,7 @@ public void simpleSpanStartAddEventEnd_02Threads() { @Threads(value = 10) @Fork(1) @Warmup(iterations = 5, time = 1) - @Measurement(iterations = 10, time = 1) + @Measurement(iterations = 50, time = 1) @OutputTimeUnit(TimeUnit.MILLISECONDS) public void simpleSpanStartAddEventEnd_10Threads() { doSpanWork(); diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java index cfef1aecd3a..ecfd56c898b 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorDroppedSpansBenchmark.java @@ -11,7 +11,6 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricProducer; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; @@ -52,7 +51,6 @@ public CompletableResultCode shutdown() { @State(Scope.Benchmark) public static class BenchmarkState { - private final MetricProducer metricProducer = ((SdkMeterProvider) GlobalMetricsProvider.get()); private BatchSpanProcessor processor; private Tracer tracer; private Collection allMetrics; @@ -61,6 +59,7 @@ public static class BenchmarkState { public final void setup() { SpanExporter exporter = new DelayingSpanExporter(); processor = BatchSpanProcessor.builder(exporter).build(); + GlobalMetricsProvider.set(SdkMeterProvider.builder().build()); tracer = SdkTracerProvider.builder().build().get("benchmarkTracer"); } @@ -72,7 +71,7 @@ public final void tearDown() { @TearDown(Level.Iteration) public final void recordMetrics() { - allMetrics = metricProducer.collectAllMetrics(); + allMetrics = ((SdkMeterProvider) GlobalMetricsProvider.get()).collectAllMetrics(); } } @@ -128,6 +127,30 @@ private long getMetric(boolean dropped) { } } + @Benchmark + @Fork(1) + @Threads(1) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 5, time = 20) + @BenchmarkMode(Mode.Throughput) + public void export_01Threads( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(2) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 5, time = 20) + @BenchmarkMode(Mode.Throughput) + public void export_02Threads( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + /** Export spans through {@link BatchSpanProcessor}. */ @Benchmark @Fork(1) @@ -135,7 +158,19 @@ private long getMetric(boolean dropped) { @Warmup(iterations = 5, time = 1) @Measurement(iterations = 5, time = 20) @BenchmarkMode(Mode.Throughput) - public void export( + public void export_05Threads( + BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { + benchmarkState.processor.onEnd( + (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); + } + + @Benchmark + @Fork(1) + @Threads(10) + @Warmup(iterations = 5, time = 1) + @Measurement(iterations = 5, time = 20) + @BenchmarkMode(Mode.Throughput) + public void export_10Threads( BenchmarkState benchmarkState, @SuppressWarnings("unused") ThreadState threadState) { benchmarkState.processor.onEnd( (ReadableSpan) benchmarkState.tracer.spanBuilder("span").startSpan()); From d37612885c7c1c8dc50b0e4e3a7e58cbf210fd6e Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Tue, 2 Mar 2021 22:44:26 -0800 Subject: [PATCH 5/7] Optimize bsp --- .../TracerProviderConfigurationTest.java | 11 --- .../sdk/trace/export/BatchSpanProcessor.java | 88 ++++++++++++------- .../trace/export/BatchSpanProcessorTest.java | 4 +- 3 files changed, 58 insertions(+), 45 deletions(-) diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java index 07cd6457165..864e3d6f250 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java @@ -19,7 +19,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -83,11 +82,6 @@ void configureSpanProcessor_empty() { .extracting("exporterTimeoutNanos") .isEqualTo(TimeUnit.MILLISECONDS.toNanos(30000)); assertThat(worker).extracting("maxExportBatchSize").isEqualTo(512); - assertThat(worker) - .extracting("queue") - .isInstanceOfSatisfying( - ArrayBlockingQueue.class, - queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048)); assertThat(worker).extracting("spanExporter").isEqualTo(exporter); }); } finally { @@ -119,11 +113,6 @@ void configureSpanProcessor_configured() { .extracting("exporterTimeoutNanos") .isEqualTo(TimeUnit.MILLISECONDS.toNanos(4)); assertThat(worker).extracting("maxExportBatchSize").isEqualTo(3); - assertThat(worker) - .extracting("queue") - .isInstanceOfSatisfying( - ArrayBlockingQueue.class, - queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); assertThat(worker).extracting("spanExporter").isEqualTo(exporter); }); } finally { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index d114a9b72c4..0ebfaada914 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,13 +17,14 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -72,7 +73,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); + new ArrayDeque(maxQueueSize), + maxQueueSize); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -123,26 +125,31 @@ private static final class Worker implements Runnable { private final long scheduleDelayNanos; private final int maxExportBatchSize; private final long exporterTimeoutNanos; - - private long nextExportTime; - - private final BlockingQueue queue; - - private final AtomicReference flushRequested = new AtomicReference<>(); + private final ArrayDeque queue; private volatile boolean continueWork = true; - private final ArrayList batch; + private final Collection batch; + private final int maxQueueSize; + private final ReentrantLock lock; + private final Condition needExport; + private final AtomicReference flushRequested = new AtomicReference<>(); + private int spanBatchCounter = 0; + private long nextExportTime; private Worker( SpanExporter spanExporter, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - BlockingQueue queue) { + ArrayDeque queue, + int maxQueueSize) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.maxQueueSize = maxQueueSize; + this.lock = new ReentrantLock(); + this.needExport = lock.newCondition(); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); meter .longValueObserverBuilder("queueSize") @@ -173,30 +180,41 @@ private Worker( } private void addSpan(ReadableSpan span) { - if (!queue.offer(span)) { - droppedSpans.add(1); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (queue.size() == maxQueueSize) { + droppedSpans.add(1); + } else { + queue.offer(span.toSpanData()); + if (++spanBatchCounter == maxExportBatchSize) { + spanBatchCounter = 0; + needExport.signal(); + } + } + } finally { + lock.unlock(); } } @Override public void run() { updateNextExportTime(); - while (continueWork) { if (flushRequested.get() != null) { flush(); } - + lock.lock(); try { - ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); - if (lastElement != null) { - batch.add(lastElement.toSpanData()); + needExport.awaitNanos(scheduleDelayNanos); + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return; + } finally { + lock.unlock(); } - if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); @@ -205,15 +223,16 @@ public void run() { } private void flush() { - int spansToFlush = queue.size(); - while (spansToFlush > 0) { - ReadableSpan span = queue.poll(); - assert span != null; - batch.add(span.toSpanData()); - spansToFlush--; - if (batch.size() >= maxExportBatchSize) { - exportCurrentBatch(); + lock.lock(); + try { + while (!queue.isEmpty()) { + batch.add(queue.poll()); + if (batch.size() >= maxExportBatchSize) { + exportCurrentBatch(); + } } + } finally { + lock.unlock(); } exportCurrentBatch(); flushRequested.get().succeed(); @@ -247,8 +266,14 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); - // we set the atomic here to trigger the worker loop to do a flush on its next iteration. + // we set the atomic here to trigger the worker loop to do a flush of the entire queue. flushRequested.compareAndSet(null, flushResult); + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we // get what's in the atomic. In that case, just return success, since we know it succeeded in @@ -262,8 +287,7 @@ private void exportCurrentBatch() { } try { - final CompletableResultCode result = - spanExporter.export(Collections.unmodifiableList(batch)); + final CompletableResultCode result = spanExporter.export(batch); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { exportedSpans.add(batch.size()); diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 1c27a55a730..10ec0700d47 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -457,7 +457,7 @@ void shutdownPropagatesSuccess() { when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); BatchSpanProcessor processor = BatchSpanProcessor.builder(mockSpanExporter).build(); CompletableResultCode result = processor.shutdown(); - result.join(1, TimeUnit.SECONDS); + result.join(10, TimeUnit.SECONDS); assertThat(result.isSuccess()).isTrue(); } @@ -467,7 +467,7 @@ void shutdownPropagatesFailure() { when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofFailure()); BatchSpanProcessor processor = BatchSpanProcessor.builder(mockSpanExporter).build(); CompletableResultCode result = processor.shutdown(); - result.join(1, TimeUnit.SECONDS); + result.join(10, TimeUnit.SECONDS); assertThat(result.isSuccess()).isFalse(); } From 042cafbc36e2db19a11c365120ad085d2a9ac505 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 4 Mar 2021 02:42:23 -0800 Subject: [PATCH 6/7] Try concurrent queue --- .../sdk/trace/export/BatchSpanProcessor.java | 73 ++++++++++--------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 0ebfaada914..b5042564f8d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,11 +17,12 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -73,7 +74,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayDeque(maxQueueSize), + new ConcurrentLinkedQueue(), maxQueueSize); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); @@ -125,14 +126,14 @@ private static final class Worker implements Runnable { private final long scheduleDelayNanos; private final int maxExportBatchSize; private final long exporterTimeoutNanos; - private final ArrayDeque queue; + private final ConcurrentLinkedQueue queue; + private final AtomicInteger queueSize; private volatile boolean continueWork = true; private final Collection batch; private final int maxQueueSize; private final ReentrantLock lock; private final Condition needExport; private final AtomicReference flushRequested = new AtomicReference<>(); - private int spanBatchCounter = 0; private long nextExportTime; private Worker( @@ -140,13 +141,14 @@ private Worker( long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - ArrayDeque queue, + ConcurrentLinkedQueue queue, int maxQueueSize) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.queueSize = new AtomicInteger(0); this.maxQueueSize = maxQueueSize; this.lock = new ReentrantLock(); this.needExport = lock.newCondition(); @@ -158,7 +160,7 @@ private Worker( .setUpdater( result -> result.observe( - queue.size(), + queueSize.get(), Labels.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE))) .build(); LongCounter processedSpansCounter = @@ -180,20 +182,19 @@ private Worker( } private void addSpan(ReadableSpan span) { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - if (queue.size() == maxQueueSize) { - droppedSpans.add(1); - } else { - queue.offer(span.toSpanData()); - if (++spanBatchCounter == maxExportBatchSize) { - spanBatchCounter = 0; + if (queueSize.get() == maxQueueSize) { + droppedSpans.add(1); + } else { + queue.offer(span.toSpanData()); + int newQueueSize = queueSize.incrementAndGet(); + if (newQueueSize >= maxExportBatchSize) { + lock.lock(); + try { needExport.signal(); + } finally { + lock.unlock(); } } - } finally { - lock.unlock(); } } @@ -206,15 +207,20 @@ public void run() { } lock.lock(); try { - needExport.awaitNanos(scheduleDelayNanos); - while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { - batch.add(queue.poll()); + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + needExport.awaitNanos(pollWaitTime); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); + return; } finally { lock.unlock(); } + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll()); + queueSize.decrementAndGet(); + } if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); @@ -223,16 +229,12 @@ public void run() { } private void flush() { - lock.lock(); - try { - while (!queue.isEmpty()) { - batch.add(queue.poll()); - if (batch.size() >= maxExportBatchSize) { - exportCurrentBatch(); - } + while (!queue.isEmpty()) { + batch.add(queue.poll()); + queueSize.decrementAndGet(); + if (batch.size() >= maxExportBatchSize) { + exportCurrentBatch(); } - } finally { - lock.unlock(); } exportCurrentBatch(); flushRequested.get().succeed(); @@ -267,12 +269,13 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); // we set the atomic here to trigger the worker loop to do a flush of the entire queue. - flushRequested.compareAndSet(null, flushResult); - lock.lock(); - try { - needExport.signal(); - } finally { - lock.unlock(); + if (flushRequested.compareAndSet(null, flushResult)) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we From 810cb4b0542ce536908fa7e3e25b88f5b5b9c281 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 4 Mar 2021 11:03:43 -0800 Subject: [PATCH 7/7] Try array based concurrent queue --- sdk/trace/build.gradle.kts | 1 + .../sdk/trace/export/BatchSpanProcessor.java | 46 +++++++------------ 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/sdk/trace/build.gradle.kts b/sdk/trace/build.gradle.kts index e0658542c22..caf834573fb 100644 --- a/sdk/trace/build.gradle.kts +++ b/sdk/trace/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { jmh("io.grpc:grpc-api") jmh("io.grpc:grpc-netty-shaded") jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector + implementation("org.jctools:jctools-core:3.2.0") } sourceSets { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index b5042564f8d..4125c6e3699 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,12 +17,11 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import org.jctools.queues.MpscArrayQueue; import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -74,8 +73,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ConcurrentLinkedQueue(), - maxQueueSize); + new MpscArrayQueue(maxQueueSize)); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -126,30 +124,25 @@ private static final class Worker implements Runnable { private final long scheduleDelayNanos; private final int maxExportBatchSize; private final long exporterTimeoutNanos; - private final ConcurrentLinkedQueue queue; - private final AtomicInteger queueSize; + private long nextExportTime; + private final MpscArrayQueue queue; + private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final Collection batch; - private final int maxQueueSize; private final ReentrantLock lock; private final Condition needExport; - private final AtomicReference flushRequested = new AtomicReference<>(); - private long nextExportTime; private Worker( SpanExporter spanExporter, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - ConcurrentLinkedQueue queue, - int maxQueueSize) { + MpscArrayQueue queue) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; - this.queueSize = new AtomicInteger(0); - this.maxQueueSize = maxQueueSize; this.lock = new ReentrantLock(); this.needExport = lock.newCondition(); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); @@ -160,7 +153,7 @@ private Worker( .setUpdater( result -> result.observe( - queueSize.get(), + queue.size(), Labels.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE))) .build(); LongCounter processedSpansCounter = @@ -182,18 +175,15 @@ private Worker( } private void addSpan(ReadableSpan span) { - if (queueSize.get() == maxQueueSize) { + if (!queue.offer(span)) { droppedSpans.add(1); - } else { - queue.offer(span.toSpanData()); - int newQueueSize = queueSize.incrementAndGet(); - if (newQueueSize >= maxExportBatchSize) { - lock.lock(); - try { - needExport.signal(); - } finally { - lock.unlock(); - } + } + if (queue.size() >= maxExportBatchSize) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); } } } @@ -218,8 +208,7 @@ public void run() { lock.unlock(); } while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { - batch.add(queue.poll()); - queueSize.decrementAndGet(); + batch.add(queue.poll().toSpanData()); } if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); @@ -230,8 +219,7 @@ public void run() { private void flush() { while (!queue.isEmpty()) { - batch.add(queue.poll()); - queueSize.decrementAndGet(); + batch.add(queue.poll().toSpanData()); if (batch.size() >= maxExportBatchSize) { exportCurrentBatch(); }