From 37d28636d97caafa6d3a81f830f5512e14ec6aaf Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 18 Mar 2021 15:46:58 -0700 Subject: [PATCH] More predictable signaling --- .../sdk/trace/export/BatchSpanProcessor.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index a649998b41d..6facce78e88 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -131,7 +132,7 @@ private static final class Worker implements Runnable { private long nextExportTime; private final BlockingQueue queue; - private final AtomicBoolean needSignal = new AtomicBoolean(false); + private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE); private final BlockingQueue signal; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; @@ -182,7 +183,7 @@ private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); } else { - if (queue.size() >= maxExportBatchSize && needSignal.get()) { + if (queue.size() >= spansNeeded.get()) { signal.offer(true); } } @@ -207,9 +208,9 @@ public void run() { try { long pollWaitTime = nextExportTime - System.nanoTime(); if (pollWaitTime > 0) { - needSignal.set(true); + spansNeeded.set(maxExportBatchSize - batch.size()); signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); - needSignal.set(false); + spansNeeded.set(Integer.MAX_VALUE); } } catch (InterruptedException e) { Thread.currentThread().interrupt();