Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't poll more than needed in BSP. #2971

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,12 @@ private static final class Worker implements Runnable {
private long nextExportTime;

private final BlockingQueue<ReadableSpan> queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using ArrayDequeue would be better when draining in batches similar to https://github.com/sbandadd/opentelemetry-java/pull/1/files. But I think the most important thing is to come up with a CPU benchmark and compare different versions.

private final BlockingQueue<Object> signal;

private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final ArrayList<SpanData> batch;
private final ArrayList<ReadableSpan> batch;
private final ArrayList<SpanData> batchData;

private Worker(
SpanExporter spanExporter,
Expand All @@ -143,6 +145,7 @@ private Worker(
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.queue = queue;
this.signal = new ArrayBlockingQueue<>(1);
Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace");
meter
.longValueObserverBuilder("queueSize")
Expand Down Expand Up @@ -170,10 +173,15 @@ private Worker(
Labels.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE, "dropped", "false"));

this.batch = new ArrayList<>(this.maxExportBatchSize);
this.batchData = new ArrayList<>(this.maxExportBatchSize);
}

private void addSpan(ReadableSpan span) {
if (!queue.offer(span)) {
if (queue.offer(span)) {
if (queue.size() >= maxExportBatchSize) {
signal.offer("GOGOGO");
}
} else {
droppedSpans.add(1);
}
}
Expand All @@ -188,19 +196,20 @@ public void run() {
}

try {
ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS);
if (lastElement != null) {
batch.add(lastElement.toSpanData());
long pollTimeout = nextExportTime - System.nanoTime();

if (pollTimeout > 0) {
signal.poll(pollTimeout, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}

if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
updateNextExportTime();
}
queue.drainTo(batch, maxExportBatchSize);
exportCurrentBatch();

updateNextExportTime();
}
}

Expand All @@ -209,7 +218,7 @@ private void flush() {
while (spansToFlush > 0) {
ReadableSpan span = queue.poll();
assert span != null;
batch.add(span.toSpanData());
batch.add(span);
spansToFlush--;
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
Expand Down Expand Up @@ -248,7 +257,9 @@ private CompletableResultCode shutdown() {
private CompletableResultCode forceFlush() {
CompletableResultCode flushResult = new CompletableResultCode();
// we set the atomic here to trigger the worker loop to do a flush on its next iteration.
flushRequested.compareAndSet(null, flushResult);
if (flushRequested.compareAndSet(null, flushResult)) {
signal.offer("GOGOGO");
}
CompletableResultCode possibleResult = flushRequested.get();
// there's a race here where the flush happening in the worker loop could complete before we
// get what's in the atomic. In that case, just return success, since we know it succeeded in
Expand All @@ -261,9 +272,13 @@ private void exportCurrentBatch() {
return;
}

for (ReadableSpan span : batch) {
batchData.add(span.toSpanData());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For good or ill, moving this down here will make the CPU usage a bit more spiky, since we'll only do this conversion all at once when the queue is full, or when the timer expires. The current implementation will spread out the conversion smoothly, since it happens as each span is added to the queue. Which way is better? I'm not sure, honestly, but it's something to keep in mind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there's some tradeoff - I guess for a thread with a defined "interval", I'd sort of expect it to wake, work, sleep. We have a bit of a pathological behavior right now where it's fairly conceivable for this thread to be sleeping and waking in a thrashing manner if a span is recorded every 101ms for example - sleeping/waking is very expensive and I suspect we didn't intend that to be possible.

}

try {
final CompletableResultCode result =
spanExporter.export(Collections.unmodifiableList(batch));
spanExporter.export(Collections.unmodifiableList(batchData));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
exportedSpans.add(batch.size());
Expand All @@ -274,6 +289,7 @@ private void exportCurrentBatch() {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
} finally {
batch.clear();
batchData.clear();
}
}
}
Expand Down