diff --git a/sdk/trace/build.gradle.kts b/sdk/trace/build.gradle.kts index e0658542c22..caf834573fb 100644 --- a/sdk/trace/build.gradle.kts +++ b/sdk/trace/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { jmh("io.grpc:grpc-api") jmh("io.grpc:grpc-netty-shaded") jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector + implementation("org.jctools:jctools-core:3.2.0") } sourceSets { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 403e3ec4dfe..7e900dc0504 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,13 +17,14 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import org.jctools.queues.MpscArrayQueue; import java.util.ArrayList; -import java.util.Collections; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.Collection; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -72,7 +73,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); + new MpscArrayQueue(maxQueueSize)); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -113,7 +114,7 @@ public CompletableResultCode forceFlush() { // Visible for testing ArrayList getBatch() { - return worker.batch; + return new ArrayList<>(worker.batch); } // Worker is a thread that batches multiple spans and calls the registered SpanExporter to export @@ -128,26 +129,27 @@ private static final class Worker implements Runnable { private final long scheduleDelayNanos; private final int maxExportBatchSize; private final long exporterTimeoutNanos; - private long nextExportTime; - - private final BlockingQueue queue; - + private final MpscArrayQueue queue; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; - private final ArrayList batch; + private final Collection batch; + private final ReentrantLock lock; + private final Condition needExport; private Worker( SpanExporter spanExporter, long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - BlockingQueue queue) { + MpscArrayQueue queue) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.lock = new ReentrantLock(); + this.needExport = lock.newCondition(); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); meter .longValueObserverBuilder("queueSize") @@ -181,27 +183,38 @@ private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); } + if (queue.size() >= maxExportBatchSize) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } + } } @Override public void run() { updateNextExportTime(); - while (continueWork) { if (flushRequested.get() != null) { flush(); } - + lock.lock(); try { - ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); - if (lastElement != null) { - batch.add(lastElement.toSpanData()); + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + needExport.awaitNanos(pollWaitTime); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; + } finally { + lock.unlock(); + } + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll().toSpanData()); } - if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); @@ -210,12 +223,8 @@ public void run() { } private void flush() { - int spansToFlush = queue.size(); - while (spansToFlush > 0) { - ReadableSpan span = queue.poll(); - assert span != null; - batch.add(span.toSpanData()); - spansToFlush--; + while (!queue.isEmpty()) { + batch.add(queue.poll().toSpanData()); if (batch.size() >= maxExportBatchSize) { exportCurrentBatch(); } @@ -252,8 +261,15 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); - // we set the atomic here to trigger the worker loop to do a flush on its next iteration. - flushRequested.compareAndSet(null, flushResult); + // we set the atomic here to trigger the worker loop to do a flush of the entire queue. + if (flushRequested.compareAndSet(null, flushResult)) { + lock.lock(); + try { + needExport.signal(); + } finally { + lock.unlock(); + } + } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we // get what's in the atomic. In that case, just return success, since we know it succeeded in @@ -267,8 +283,7 @@ private void exportCurrentBatch() { } try { - final CompletableResultCode result = - spanExporter.export(Collections.unmodifiableList(batch)); + final CompletableResultCode result = spanExporter.export(batch); result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); if (result.isSuccess()) { exportedSpans.add(batch.size());