diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java index 07cd6457165..ad590824048 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java @@ -7,8 +7,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.when; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.SpanLimits; @@ -21,20 +23,29 @@ import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; // NB: We use AssertJ extracting to reflectively access implementation details to test configuration // because the use of BatchSpanProcessor makes it difficult to verify values through public means. @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class TracerProviderConfigurationTest { private static final ConfigProperties EMPTY = ConfigProperties.createForTest(Collections.emptyMap()); - @Mock private SpanExporter exporter; + @Mock private SpanExporter mockSpanExporter; + + @BeforeEach + void setUp() { + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + } @Test void configureTracerProvider() { @@ -69,7 +80,7 @@ void configureTracerProvider() { @Test void configureSpanProcessor_empty() { BatchSpanProcessor processor = - TracerProviderConfiguration.configureSpanProcessor(EMPTY, exporter); + TracerProviderConfiguration.configureSpanProcessor(EMPTY, mockSpanExporter); try { assertThat(processor) @@ -88,7 +99,7 @@ void configureSpanProcessor_empty() { .isInstanceOfSatisfying( ArrayBlockingQueue.class, queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048)); - assertThat(worker).extracting("spanExporter").isEqualTo(exporter); + assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { processor.shutdown(); @@ -105,7 +116,7 @@ void configureSpanProcessor_configured() { BatchSpanProcessor processor = TracerProviderConfiguration.configureSpanProcessor( - ConfigProperties.createForTest(properties), exporter); + ConfigProperties.createForTest(properties), mockSpanExporter); try { assertThat(processor) @@ -124,7 +135,7 @@ void configureSpanProcessor_configured() { .isInstanceOfSatisfying( ArrayBlockingQueue.class, queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); - assertThat(worker).extracting("spanExporter").isEqualTo(exporter); + assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { processor.shutdown(); diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 403e3ec4dfe..2c19b30ca9b 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -23,6 +23,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; @@ -130,9 +131,15 @@ private static final class Worker implements Runnable { private final long exporterTimeoutNanos; private long nextExportTime; - private final BlockingQueue queue; - + // When waiting on the spans queue, exporter thread sets this atomic to the number of more + // spans it needs before doing an export. Writer threads would then wait for the queue to reach + // spansNeeded size before notifying the exporter thread about new entries. + // Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since + // exporter thread doesn't expect any signal initially, this value is initialized to + // Integer.MAX_VALUE. + private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE); + private final BlockingQueue signal; private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; private final ArrayList batch; @@ -148,6 +155,7 @@ private Worker( this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.signal = new ArrayBlockingQueue<>(1); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); meter .longValueObserverBuilder("queueSize") @@ -180,6 +188,10 @@ private Worker( private void addSpan(ReadableSpan span) { if (!queue.offer(span)) { droppedSpans.add(1); + } else { + if (queue.size() >= spansNeeded.get()) { + signal.offer(true); + } } } @@ -191,21 +203,26 @@ public void run() { if (flushRequested.get() != null) { flush(); } - - try { - ReadableSpan lastElement = queue.poll(100, TimeUnit.MILLISECONDS); - if (lastElement != null) { - batch.add(lastElement.toSpanData()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; + while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + batch.add(queue.poll().toSpanData()); } - if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { exportCurrentBatch(); updateNextExportTime(); } + if (queue.isEmpty()) { + try { + long pollWaitTime = nextExportTime - System.nanoTime(); + if (pollWaitTime > 0) { + spansNeeded.set(maxExportBatchSize - batch.size()); + signal.poll(pollWaitTime, TimeUnit.NANOSECONDS); + spansNeeded.set(Integer.MAX_VALUE); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } } } @@ -252,8 +269,10 @@ private CompletableResultCode shutdown() { private CompletableResultCode forceFlush() { CompletableResultCode flushResult = new CompletableResultCode(); - // we set the atomic here to trigger the worker loop to do a flush on its next iteration. - flushRequested.compareAndSet(null, flushResult); + // we set the atomic here to trigger the worker loop to do a flush of the entire queue. + if (flushRequested.compareAndSet(null, flushResult)) { + signal.offer(true); + } CompletableResultCode possibleResult = flushRequested.get(); // there's a race here where the flush happening in the worker loop could complete before we // get what's in the atomic. In that case, just return success, since we know it succeeded in diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 7e6f1eb49b4..30a453fc588 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -197,12 +197,19 @@ void forceExport() { .build(); sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { createEndedSpan("notExported"); } List exported = waitingSpanExporter.waitForExport(); assertThat(exported).isNotNull(); - assertThat(exported.size()).isEqualTo(98); + assertThat(exported.size()).isEqualTo(49); + + for (int i = 0; i < 50; i++) { + createEndedSpan("notExported"); + } + exported = waitingSpanExporter.waitForExport(); + assertThat(exported).isNotNull(); + assertThat(exported.size()).isEqualTo(49); batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS); exported = waitingSpanExporter.getExported();