-
Notifications
You must be signed in to change notification settings - Fork 846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Don't poll more than needed in BSP. #2971
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,10 +127,12 @@ private static final class Worker implements Runnable { | |
private long nextExportTime; | ||
|
||
private final BlockingQueue<ReadableSpan> queue; | ||
private final BlockingQueue<Object> signal; | ||
|
||
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>(); | ||
private volatile boolean continueWork = true; | ||
private final ArrayList<SpanData> batch; | ||
private final ArrayList<ReadableSpan> batch; | ||
private final ArrayList<SpanData> batchData; | ||
|
||
private Worker( | ||
SpanExporter spanExporter, | ||
|
@@ -143,6 +145,7 @@ private Worker( | |
this.maxExportBatchSize = maxExportBatchSize; | ||
this.exporterTimeoutNanos = exporterTimeoutNanos; | ||
this.queue = queue; | ||
this.signal = new ArrayBlockingQueue<>(1); | ||
Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); | ||
meter | ||
.longValueObserverBuilder("queueSize") | ||
|
@@ -170,10 +173,15 @@ private Worker( | |
Labels.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE, "dropped", "false")); | ||
|
||
this.batch = new ArrayList<>(this.maxExportBatchSize); | ||
this.batchData = new ArrayList<>(this.maxExportBatchSize); | ||
} | ||
|
||
private void addSpan(ReadableSpan span) { | ||
if (!queue.offer(span)) { | ||
if (queue.offer(span)) { | ||
if (queue.size() >= maxExportBatchSize) { | ||
signal.offer("GOGOGO"); | ||
} | ||
} else { | ||
droppedSpans.add(1); | ||
} | ||
} | ||
|
@@ -188,19 +196,20 @@ public void run() { | |
} | ||
|
||
try { | ||
ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); | ||
if (lastElement != null) { | ||
batch.add(lastElement.toSpanData()); | ||
long pollTimeout = nextExportTime - System.nanoTime(); | ||
|
||
if (pollTimeout > 0) { | ||
signal.poll(pollTimeout, TimeUnit.NANOSECONDS); | ||
} | ||
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
return; | ||
} | ||
|
||
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { | ||
exportCurrentBatch(); | ||
updateNextExportTime(); | ||
} | ||
queue.drainTo(batch, maxExportBatchSize); | ||
exportCurrentBatch(); | ||
|
||
updateNextExportTime(); | ||
} | ||
} | ||
|
||
|
@@ -209,7 +218,7 @@ private void flush() { | |
while (spansToFlush > 0) { | ||
ReadableSpan span = queue.poll(); | ||
assert span != null; | ||
batch.add(span.toSpanData()); | ||
batch.add(span); | ||
spansToFlush--; | ||
if (batch.size() >= maxExportBatchSize) { | ||
exportCurrentBatch(); | ||
|
@@ -248,7 +257,9 @@ private CompletableResultCode shutdown() { | |
private CompletableResultCode forceFlush() { | ||
CompletableResultCode flushResult = new CompletableResultCode(); | ||
// we set the atomic here to trigger the worker loop to do a flush on its next iteration. | ||
flushRequested.compareAndSet(null, flushResult); | ||
if (flushRequested.compareAndSet(null, flushResult)) { | ||
signal.offer("GOGOGO"); | ||
} | ||
CompletableResultCode possibleResult = flushRequested.get(); | ||
// there's a race here where the flush happening in the worker loop could complete before we | ||
// get what's in the atomic. In that case, just return success, since we know it succeeded in | ||
|
@@ -261,9 +272,13 @@ private void exportCurrentBatch() { | |
return; | ||
} | ||
|
||
for (ReadableSpan span : batch) { | ||
batchData.add(span.toSpanData()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For good or ill, moving this down here will make the CPU usage a bit more spiky, since we'll only do this conversion all at once when the queue is full, or when the timer expires. The current implementation will spread out the conversion smoothly, since it happens as each span is added to the queue. Which way is better? I'm not sure, honestly, but it's something to keep in mind. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah there's some tradeoff - I guess for a thread with a defined "interval", I'd sort of expect it to wake, work, sleep. We have a bit of a pathological behavior right now where it's fairly conceivable for this thread to be sleeping and waking in a thrashing manner if a span is recorded every 101ms for example - sleeping/waking is very expensive and I suspect we didn't intend that to be possible. |
||
} | ||
|
||
try { | ||
final CompletableResultCode result = | ||
spanExporter.export(Collections.unmodifiableList(batch)); | ||
spanExporter.export(Collections.unmodifiableList(batchData)); | ||
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS); | ||
if (result.isSuccess()) { | ||
exportedSpans.add(batch.size()); | ||
|
@@ -274,6 +289,7 @@ private void exportCurrentBatch() { | |
logger.log(Level.WARNING, "Exporter threw an Exception", e); | ||
} finally { | ||
batch.clear(); | ||
batchData.clear(); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using ArrayDequeue would be better when draining in batches similar to https://github.com/sbandadd/opentelemetry-java/pull/1/files. But I think the most important thing is to come up with a CPU benchmark and compare different versions.