From 556210e2d01a8a94f1eef54f0d3693d5e3a5edf1 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 18 Mar 2021 15:46:58 -0700 Subject: [PATCH] More predictable signaling --- .../sdk/trace/export/BatchSpanProcessor.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index a649998b41d..c07b65bb18e 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -131,7 +132,13 @@ private static final class Worker implements Runnable { private long nextExportTime; private final BlockingQueue queue; - private final AtomicBoolean needSignal = new AtomicBoolean(false); + // When waiting on the spans queue, exporter thread sets this atomic to the number of more + // spans it need before doing an export. Writer threads would then wait for the queue to reach + // spansNeeded size before notifying the exporter thread about new entries. + // Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since + // exporter thread doesn't expect any signal initially, this value is initialized to + // Integer.MAX_VALUE. + private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE); private final BlockingQueue signal; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; @@ -182,7 +189,7 @@ private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); } else { - if (queue.size() >= maxExportBatchSize && needSignal.get()) { + if (queue.size() >= spansNeeded.get()) { signal.offer(true); } } @@ -207,9 +214,9 @@ public void run() { try { long pollWaitTime = nextExportTime - System.nanoTime(); if (pollWaitTime > 0) { - needSignal.set(true); + spansNeeded.set(maxExportBatchSize - batch.size()); signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); - needSignal.set(false); + spansNeeded.set(Integer.MAX_VALUE); } } catch (InterruptedException e) { Thread.currentThread().interrupt();