From 1816f36284a4251dd271bbd05bfe270bc9a95792 Mon Sep 17 00:00:00 2001 From: Santosh Banda Date: Wed, 10 Mar 2021 16:31:13 -0800 Subject: [PATCH] Use java's inbuilt concurrent queue --- dependencyManagement/build.gradle.kts | 1 - sdk-extensions/autoconfigure/build.gradle.kts | 1 - .../TracerProviderConfigurationTest.java | 33 ++++++++++--------- sdk/trace/build.gradle.kts | 1 - .../sdk/trace/export/BatchSpanProcessor.java | 22 +++++++++---- 5 files changed, 34 insertions(+), 24 deletions(-) diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index 6d638c69b69..b328435b1c2 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -92,7 +92,6 @@ val DEPENDENCIES = listOf( "org.awaitility:awaitility:4.0.3", "org.codehaus.mojo:animal-sniffer-annotations:1.20", "org.curioswitch.curiostack:protobuf-jackson:1.2.0", - "org.jctools:jctools-core:3.2.0", "org.junit-pioneer:junit-pioneer:1.3.8", "org.skyscreamer:jsonassert:1.5.0", "org.slf4j:slf4j-simple:1.7.30" diff --git a/sdk-extensions/autoconfigure/build.gradle.kts b/sdk-extensions/autoconfigure/build.gradle.kts index e32d69016ec..d660ed39a87 100644 --- a/sdk-extensions/autoconfigure/build.gradle.kts +++ b/sdk-extensions/autoconfigure/build.gradle.kts @@ -39,7 +39,6 @@ dependencies { testImplementation(project(":sdk:testing")) testImplementation("com.linecorp.armeria:armeria-junit5") testImplementation("com.linecorp.armeria:armeria-grpc") - testImplementation("org.jctools:jctools-core") testRuntimeOnly("io.grpc:grpc-netty-shaded") testRuntimeOnly("org.slf4j:slf4j-simple") diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java index 32ba58c48a5..5cf5e9304ac 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java @@ -7,8 +7,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.when; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.SpanLimits; @@ -20,21 +22,29 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.jctools.queues.MpscArrayQueue; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; // NB: We use AssertJ extracting to reflectively access implementation details to test configuration // because the use of BatchSpanProcessor makes it difficult to verify values through public means. @ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) class TracerProviderConfigurationTest { private static final ConfigProperties EMPTY = ConfigProperties.createForTest(Collections.emptyMap()); - @Mock private SpanExporter exporter; + @Mock private SpanExporter mockSpanExporter; + + @BeforeEach + void setUp() { + when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + } @Test void configureTracerProvider() { @@ -69,7 +79,7 @@ void configureTracerProvider() { @Test void configureSpanProcessor_empty() { BatchSpanProcessor processor = - TracerProviderConfiguration.configureSpanProcessor(EMPTY, exporter); + TracerProviderConfiguration.configureSpanProcessor(EMPTY, mockSpanExporter); try { assertThat(processor) @@ -83,12 +93,8 @@ void configureSpanProcessor_empty() { .extracting("exporterTimeoutNanos") .isEqualTo(TimeUnit.MILLISECONDS.toNanos(30000)); assertThat(worker).extracting("maxExportBatchSize").isEqualTo(512); - assertThat(worker) - .extracting("queue") - .isInstanceOfSatisfying( - MpscArrayQueue.class, - queue -> assertThat(queue.capacity()).isEqualTo(2048)); - assertThat(worker).extracting("spanExporter").isEqualTo(exporter); + assertThat(worker).extracting("maxQueueSize").isEqualTo(2048); + assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { processor.shutdown(); @@ -105,7 +111,7 @@ void configureSpanProcessor_configured() { BatchSpanProcessor processor = TracerProviderConfiguration.configureSpanProcessor( - ConfigProperties.createForTest(properties), exporter); + ConfigProperties.createForTest(properties), mockSpanExporter); try { assertThat(processor) @@ -119,11 +125,8 @@ void configureSpanProcessor_configured() { .extracting("exporterTimeoutNanos") .isEqualTo(TimeUnit.MILLISECONDS.toNanos(4)); assertThat(worker).extracting("maxExportBatchSize").isEqualTo(3); - assertThat(worker) - .extracting("queue") - .isInstanceOfSatisfying( - MpscArrayQueue.class, queue -> assertThat(queue.capacity()).isEqualTo(2)); - assertThat(worker).extracting("spanExporter").isEqualTo(exporter); + assertThat(worker).extracting("maxQueueSize").isEqualTo(2); + assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { processor.shutdown(); diff --git a/sdk/trace/build.gradle.kts b/sdk/trace/build.gradle.kts index d3d4dc79aa9..e0658542c22 100644 --- a/sdk/trace/build.gradle.kts +++ b/sdk/trace/build.gradle.kts @@ -45,7 +45,6 @@ dependencies { jmh("io.grpc:grpc-api") jmh("io.grpc:grpc-netty-shaded") jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector - implementation("org.jctools:jctools-core") } sourceSets { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 6fe1a74f70b..dff51add558 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -19,8 +19,10 @@ import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; @@ -28,7 +30,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; -import org.jctools.queues.MpscArrayQueue; /** * Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes @@ -75,7 +76,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new MpscArrayQueue(maxQueueSize)); + new ConcurrentLinkedQueue(), + maxQueueSize); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -148,7 +150,9 @@ private static final class Worker implements Runnable { private final int maxExportBatchSize; private final long exporterTimeoutNanos; private long nextExportTime; - private final MpscArrayQueue queue; + private final ConcurrentLinkedQueue queue; + private final int maxQueueSize; + private final AtomicInteger queueSize = new AtomicInteger(0); private final AtomicLong addedSpansCounter = new AtomicLong(0); private final AtomicReference flushRequested = new AtomicReference<>(); private volatile boolean continueWork = true; @@ -165,12 +169,14 @@ private Worker( long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - MpscArrayQueue queue) { + ConcurrentLinkedQueue queue, + int maxQueueSize) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; this.exporterTimeoutNanos = exporterTimeoutNanos; this.queue = queue; + this.maxQueueSize = maxQueueSize; this.lock = new ReentrantLock(); this.needExport = lock.newCondition(); Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace"); @@ -181,7 +187,7 @@ private Worker( .setUpdater( result -> result.observe( - queue.size(), + queueSize.get(), Labels.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE))) .build(); LongCounter processedSpansCounter = @@ -203,10 +209,12 @@ private Worker( } private void addSpan(ReadableSpan span) { - if (!queue.offer(span)) { + if (queueSize.get() >= maxQueueSize) { droppedSpansCounter.incrementAndGet(); droppedSpans.add(1); } else { + queue.offer(span); + queueSize.incrementAndGet(); if (addedSpansCounter.incrementAndGet() % maxExportBatchSize == 0) { lock.lock(); try { @@ -226,6 +234,7 @@ public void run() { flush(); } while (!queue.isEmpty() && batch.size() < maxExportBatchSize) { + queueSize.decrementAndGet(); batch.add(queue.poll().toSpanData()); } if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) { @@ -252,6 +261,7 @@ public void run() { private void flush() { while (!queue.isEmpty()) { batch.add(queue.poll().toSpanData()); + queueSize.decrementAndGet(); if (batch.size() >= maxExportBatchSize) { exportCurrentBatch(); }