Skip to content

Commit

Permalink
More predictable signaling
Browse files Browse the repository at this point in the history
  • Loading branch information
sbandadd committed Mar 19, 2021
1 parent 5ac7737 commit 37d2863
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -131,7 +132,7 @@ private static final class Worker implements Runnable {

private long nextExportTime;
private final BlockingQueue<ReadableSpan> queue;
private final AtomicBoolean needSignal = new AtomicBoolean(false);
private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE);
private final BlockingQueue<Boolean> signal;
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
Expand Down Expand Up @@ -182,7 +183,7 @@ private void addSpan(ReadableSpan span) {
if (!queue.offer(span)) {
droppedSpans.add(1);
} else {
if (queue.size() >= maxExportBatchSize && needSignal.get()) {
if (queue.size() >= spansNeeded.get()) {
signal.offer(true);
}
}
Expand All @@ -207,9 +208,9 @@ public void run() {
try {
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
needSignal.set(true);
spansNeeded.set(maxExportBatchSize - batch.size());
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
needSignal.set(false);
spansNeeded.set(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down

0 comments on commit 37d2863

Please sign in to comment.