From f47c1dcd2147c31e225b2562e226681ce931f211 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Thu, 4 Mar 2021 12:35:30 -0800 Subject: [PATCH] Optimize batch span processor Description: Batch span processor currently is aggressive in the sense that any new spans are sent to the exporter, this involves lots of overhead from signalling under heavy load and overhead from constant polling by exporter thread under less load. This PR makes exporter thread wait for maxExportBatchSize to avoid busy polling of the queue. Benchmark results are here. --- .../TracerProviderConfigurationTest.java | 21 +++++++--- .../sdk/trace/export/BatchSpanProcessor.java | 39 ++++++++++++------- .../trace/export/BatchSpanProcessorTest.java | 11 +++++- 3 files changed, 50 insertions(+), 21 deletions(-) diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java index 07cd6457165..ad590824048 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java @@ -7,8 +7,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.when; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.SpanLimits; @@ -21,20 +23,29 @@ import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; // NB: We use AssertJ extracting to reflectively access implementation details to test configuration // because the use of BatchSpanProcessor makes it difficult to verify values through public means. @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class TracerProviderConfigurationTest { private static final ConfigProperties EMPTY = ConfigProperties.createForTest(Collections.emptyMap()); - @Mock private SpanExporter exporter; + @Mock private SpanExporter mockSpanExporter; + + @BeforeEach + void setUp() { + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + } @Test void configureTracerProvider() { @@ -69,7 +80,7 @@ void configureTracerProvider() { @Test void configureSpanProcessor_empty() { BatchSpanProcessor processor = - TracerProviderConfiguration.configureSpanProcessor(EMPTY, exporter); + TracerProviderConfiguration.configureSpanProcessor(EMPTY, mockSpanExporter); try { assertThat(processor) @@ -88,7 +99,7 @@ void configureSpanProcessor_empty() { .isInstanceOfSatisfying( ArrayBlockingQueue.class, queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048)); - assertThat(worker).extracting("spanExporter").isEqualTo(exporter); + assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { processor.shutdown(); @@ -105,7 +116,7 @@ void configureSpanProcessor_configured() { BatchSpanProcessor processor = TracerProviderConfiguration.configureSpanProcessor( - ConfigProperties.createForTest(properties), exporter); + ConfigProperties.createForTest(properties), mockSpanExporter); try { assertThat(processor) @@ -124,7 +135,7 @@ void configureSpanProcessor_configured() { .isInstanceOfSatisfying( ArrayBlockingQueue.class, queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); - assertThat(worker).extracting("spanExporter").isEqualTo(exporter); + assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { processor.shutdown(); diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 403e3ec4dfe..2d91c891f2d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -130,9 +131,9 @@ private static final class Worker implements Runnable { private final long exporterTimeoutNanos; private long nextExportTime; - private final BlockingQueue queue; - + private final AtomicLong addedSpansCounter = new AtomicLong(0); + private final BlockingQueue signal; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final ArrayList batch; @@ -148,6 +149,7 @@ private Worker( this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.signal = new ArrayBlockingQueue<>(1); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); meter .longValueObserverBuilder("queueSize") @@ -180,6 +182,10 @@ private Worker( private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); + } else { + if (addedSpansCounter.incrementAndGet() % maxExportBatchSize == 0) { + signal.offer(true); + } } } @@ -191,21 +197,24 @@ public void run() { if (flushRequested.get() != null) { flush(); } - - try { - ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); - if (lastElement != null) { - batch.add(lastElement.toSpanData()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll().toSpanData()); } - if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); } + if (queue.isEmpty()) { + try { + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } } } @@ -252,8 +261,10 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); - // we set the atomic here to trigger the worker loop to do a flush on its next iteration. - flushRequested.compareAndSet(null, flushResult); + // we set the atomic here to trigger the worker loop to do a flush of the entire queue. + if (flushRequested.compareAndSet(null, flushResult)) { + signal.offer(true); + } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we // get what's in the atomic. In that case, just return success, since we know it succeeded in diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 7e6f1eb49b4..30a453fc588 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -197,12 +197,19 @@ void forceExport() { .build(); sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { createEndedSpan("notExported"); } List exported = waitingSpanExporter.waitForExport(); assertThat(exported).isNotNull(); - assertThat(exported.size()).isEqualTo(98); + assertThat(exported.size()).isEqualTo(49); + + for (int i = 0; i < 50; i++) { + createEndedSpan("notExported"); + } + exported = waitingSpanExporter.waitForExport(); + assertThat(exported).isNotNull(); + assertThat(exported.size()).isEqualTo(49); batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); exported = waitingSpanExporter.getExported();