From 8c2471bac5481cc5943ddef0bed487473796aeb6 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 4 Mar 2021 12:35:30 -0800 Subject: [PATCH] Optimize batch span processor Description: Batch span processor currently is aggressive in the sense that any new spans are sent to the exporter, this involves lots of overhead from signalling under heavy load and overhead from constant polling by exporter thread under less load. This approach uses a multi producer and single consumer bounded concurrent queue. Worker threads add the spans to the queue and signal the exporter when the queue size reaches maxExportBatchSize, exporter thread copies the queue into an array and initiates IO. ![image](https://user-images.githubusercontent.com/62265954/110030822-03551b00-7ceb-11eb-89e8-4c5107fe434e.png) ![image](https://user-images.githubusercontent.com/62265954/110030844-094afc00-7ceb-11eb-8973-886f9982e8fe.png) Context and more benchmarking results are in https://github.com/open-telemetry/opentelemetry-java/issues/2968. --- dependencyManagement/build.gradle.kts | 1 + sdk-extensions/autoconfigure/build.gradle.kts | 1 + .../TracerProviderConfigurationTest.java | 9 ++- sdk/trace/build.gradle.kts | 1 + .../sdk/trace/export/BatchSpanProcessor.java | 69 +++++++++++-------- 5 files changed, 49 insertions(+), 32 deletions(-) diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index b328435b1c2..6d638c69b69 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -92,6 +92,7 @@ val DEPENDENCIES = listOf( "org.awaitility:awaitility:4.0.3", "org.codehaus.mojo:animal-sniffer-annotations:1.20", "org.curioswitch.curiostack:protobuf-jackson:1.2.0", + "org.jctools:jctools-core:3.2.0", "org.junit-pioneer:junit-pioneer:1.3.8", "org.skyscreamer:jsonassert:1.5.0", "org.slf4j:slf4j-simple:1.7.30" diff --git a/sdk-extensions/autoconfigure/build.gradle.kts b/sdk-extensions/autoconfigure/build.gradle.kts index d660ed39a87..e32d69016ec 100644 --- a/sdk-extensions/autoconfigure/build.gradle.kts +++ b/sdk-extensions/autoconfigure/build.gradle.kts @@ -39,6 +39,7 @@ dependencies { testImplementation(project(":sdk:testing")) testImplementation("com.linecorp.armeria:armeria-junit5") testImplementation("com.linecorp.armeria:armeria-grpc") + testImplementation("org.jctools:jctools-core") testRuntimeOnly("io.grpc:grpc-netty-shaded") testRuntimeOnly("org.slf4j:slf4j-simple") diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java index 07cd6457165..32ba58c48a5 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java @@ -19,8 +19,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import org.jctools.queues.MpscArrayQueue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -86,8 +86,8 @@ void configureSpanProcessor_empty() { assertThat(worker) .extracting("queue") .isInstanceOfSatisfying( - ArrayBlockingQueue.class, - queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048)); + MpscArrayQueue.class, + queue -> assertThat(queue.capacity()).isEqualTo(2048)); assertThat(worker).extracting("spanExporter").isEqualTo(exporter); }); } finally { @@ -122,8 +122,7 @@ void configureSpanProcessor_configured() { assertThat(worker) .extracting("queue") .isInstanceOfSatisfying( - ArrayBlockingQueue.class, - queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); + MpscArrayQueue.class, queue -> assertThat(queue.capacity()).isEqualTo(2)); assertThat(worker).extracting("spanExporter").isEqualTo(exporter); }); } finally { diff --git a/sdk/trace/build.gradle.kts b/sdk/trace/build.gradle.kts index e0658542c22..d3d4dc79aa9 100644 --- a/sdk/trace/build.gradle.kts +++ b/sdk/trace/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { jmh("io.grpc:grpc-api") jmh("io.grpc:grpc-netty-shaded") jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector + implementation("org.jctools:jctools-core") } sourceSets { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 403e3ec4dfe..903ab0984b3 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -18,14 +18,15 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import org.jctools.queues.MpscArrayQueue; /** * Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes @@ -72,7 +73,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); + new MpscArrayQueue(maxQueueSize)); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -113,7 +114,7 @@ public CompletableResultCode forceFlush() { // Visible for testing ArrayList getBatch() { - return worker.batch; + return new ArrayList<>(worker.batch); } // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export @@ -128,26 +129,27 @@ private static final class Worker implements Runnable { private final long scheduleDelayNanos; private final int maxExportBatchSize; private final long exporterTimeoutNanos; - private long nextExportTime; - - private final BlockingQueue queue; - + private final MpscArrayQueue queue; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; - private final ArrayList batch; + private final Collection batch; + private final ReentrantLock lock; + private final Condition needExport; private Worker( SpanExporter spanExporter, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - BlockingQueue queue) { + MpscArrayQueue queue) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.lock = new ReentrantLock(); + this.needExport = lock.newCondition(); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); meter .longValueObserverBuilder("queueSize") @@ -181,27 +183,38 @@ private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); } + if (queue.size() >= maxExportBatchSize) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } + } } @Override public void run() { updateNextExportTime(); - while (continueWork) { if (flushRequested.get() != null) { flush(); } - + lock.lock(); try { - ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); - if (lastElement != null) { - batch.add(lastElement.toSpanData()); + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + needExport.awaitNanos(pollWaitTime); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; + } finally { + lock.unlock(); + } + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll().toSpanData()); } - if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); @@ -210,12 +223,8 @@ public void run() { } private void flush() { - int spansToFlush = queue.size(); - while (spansToFlush > 0) { - ReadableSpan span = queue.poll(); - assert span != null; - batch.add(span.toSpanData()); - spansToFlush--; + while (!queue.isEmpty()) { + batch.add(queue.poll().toSpanData()); if (batch.size() >= maxExportBatchSize) { exportCurrentBatch(); } @@ -252,8 +261,15 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); - // we set the atomic here to trigger the worker loop to do a flush on its next iteration. - flushRequested.compareAndSet(null, flushResult); + // we set the atomic here to trigger the worker loop to do a flush of the entire queue. + if (flushRequested.compareAndSet(null, flushResult)) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } + } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we // get what's in the atomic. In that case, just return success, since we know it succeeded in @@ -267,8 +283,7 @@ private void exportCurrentBatch() { } try { - final CompletableResultCode result = - spanExporter.export(Collections.unmodifiableList(batch)); + final CompletableResultCode result = spanExporter.export(batch); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { exportedSpans.add(batch.size());