From 046066d5442690ef49f35a85400a321a55a7ada7 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 4 Mar 2021 12:35:30 -0800 Subject: [PATCH] Optimize batch span processor Description: Context and benchmarking results are in https://github.com/open-telemetry/opentelemetry-java/issues/2968. Batch span processor currently is aggressive in the sense that any new spans are sent to the exporter, this involves lots of overhead from signalling under heavy load and overhead from constant polling by exporter thread under less load. This approach uses a multi producer and single consumer bounded concurrent queue. Worker threads add the spans to the queue and signal the exporter when the queue size reaches maxExportBatchSize, exporter thread copies the queue into an array and initiates IO. --- sdk/trace/build.gradle.kts | 1 + .../sdk/trace/export/BatchSpanProcessor.java | 69 +++++++++++-------- 2 files changed, 43 insertions(+), 27 deletions(-) diff --git a/sdk/trace/build.gradle.kts b/sdk/trace/build.gradle.kts index e0658542c22..caf834573fb 100644 --- a/sdk/trace/build.gradle.kts +++ b/sdk/trace/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { jmh("io.grpc:grpc-api") jmh("io.grpc:grpc-netty-shaded") jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector + implementation("org.jctools:jctools-core:3.2.0") } sourceSets { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 403e3ec4dfe..7e900dc0504 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,13 +17,14 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import org.jctools.queues.MpscArrayQueue; import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -72,7 +73,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); + new MpscArrayQueue(maxQueueSize)); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -113,7 +114,7 @@ public CompletableResultCode forceFlush() { // Visible for testing ArrayList getBatch() { - return worker.batch; + return new ArrayList<>(worker.batch); } // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export @@ -128,26 +129,27 @@ private static final class Worker implements Runnable { private final long scheduleDelayNanos; private final int maxExportBatchSize; private final long exporterTimeoutNanos; - private long nextExportTime; - - private final BlockingQueue queue; - + private final MpscArrayQueue queue; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; - private final ArrayList batch; + private final Collection batch; + private final ReentrantLock lock; + private final Condition needExport; private Worker( SpanExporter spanExporter, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - BlockingQueue queue) { + MpscArrayQueue queue) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.lock = new ReentrantLock(); + this.needExport = lock.newCondition(); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); meter .longValueObserverBuilder("queueSize") @@ -181,27 +183,38 @@ private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); } + if (queue.size() >= maxExportBatchSize) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } + } } @Override public void run() { updateNextExportTime(); - while (continueWork) { if (flushRequested.get() != null) { flush(); } - + lock.lock(); try { - ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); - if (lastElement != null) { - batch.add(lastElement.toSpanData()); + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + needExport.awaitNanos(pollWaitTime); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; + } finally { + lock.unlock(); + } + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll().toSpanData()); } - if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); @@ -210,12 +223,8 @@ public void run() { } private void flush() { - int spansToFlush = queue.size(); - while (spansToFlush > 0) { - ReadableSpan span = queue.poll(); - assert span != null; - batch.add(span.toSpanData()); - spansToFlush--; + while (!queue.isEmpty()) { + batch.add(queue.poll().toSpanData()); if (batch.size() >= maxExportBatchSize) { exportCurrentBatch(); } @@ -252,8 +261,15 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); - // we set the atomic here to trigger the worker loop to do a flush on its next iteration. - flushRequested.compareAndSet(null, flushResult); + // we set the atomic here to trigger the worker loop to do a flush of the entire queue. + if (flushRequested.compareAndSet(null, flushResult)) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } + } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we // get what's in the atomic. In that case, just return success, since we know it succeeded in @@ -267,8 +283,7 @@ private void exportCurrentBatch() { } try { - final CompletableResultCode result = - spanExporter.export(Collections.unmodifiableList(batch)); + final CompletableResultCode result = spanExporter.export(batch); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { exportedSpans.add(batch.size());