Skip to content

Commit

Permalink
More predictable signaling
Browse files Browse the repository at this point in the history
  • Loading branch information
sbandadd committed Mar 19, 2021
1 parent 5ac7737 commit 556210e
Showing 1 changed file with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -131,7 +132,13 @@ private static final class Worker implements Runnable {

private long nextExportTime;
private final BlockingQueue<ReadableSpan> queue;
private final AtomicBoolean needSignal = new AtomicBoolean(false);
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
// spans it need before doing an export. Writer threads would then wait for the queue to reach
// spansNeeded size before notifying the exporter thread about new entries.
// Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since
// exporter thread doesn't expect any signal initially, this value is initialized to
// Integer.MAX_VALUE.
private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE);
private final BlockingQueue<Boolean> signal;
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
Expand Down Expand Up @@ -182,7 +189,7 @@ private void addSpan(ReadableSpan span) {
if (!queue.offer(span)) {
droppedSpans.add(1);
} else {
if (queue.size() >= maxExportBatchSize && needSignal.get()) {
if (queue.size() >= spansNeeded.get()) {
signal.offer(true);
}
}
Expand All @@ -207,9 +214,9 @@ public void run() {
try {
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
needSignal.set(true);
spansNeeded.set(maxExportBatchSize - batch.size());
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
needSignal.set(false);
spansNeeded.set(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down

0 comments on commit 556210e

Please sign in to comment.