diff --git a/dependencyManagement/build.gradle.kts b/dependencyManagement/build.gradle.kts index b328435b1c2..3cc3efba984 100644 --- a/dependencyManagement/build.gradle.kts +++ b/dependencyManagement/build.gradle.kts @@ -92,6 +92,7 @@ val DEPENDENCIES = listOf( "org.awaitility:awaitility:4.0.3", "org.codehaus.mojo:animal-sniffer-annotations:1.20", "org.curioswitch.curiostack:protobuf-jackson:1.2.0", + "org.jctools:jctools-core:3.3.0", "org.junit-pioneer:junit-pioneer:1.3.8", "org.skyscreamer:jsonassert:1.5.0", "org.slf4j:slf4j-simple:1.7.30" diff --git a/sdk-extensions/autoconfigure/build.gradle.kts b/sdk-extensions/autoconfigure/build.gradle.kts index d660ed39a87..e104346eeb1 100644 --- a/sdk-extensions/autoconfigure/build.gradle.kts +++ b/sdk-extensions/autoconfigure/build.gradle.kts @@ -35,6 +35,8 @@ dependencies { compileOnly("io.prometheus:simpleclient_httpserver") compileOnly(project(":exporters:zipkin")) + testImplementation(project(path=":sdk:trace-shaded-deps")) + testImplementation(project(":proto")) testImplementation(project(":sdk:testing")) testImplementation("com.linecorp.armeria:armeria-junit5") diff --git a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java index ad590824048..04dfb55dc8d 100644 --- a/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java +++ b/sdk-extensions/autoconfigure/src/test/java/io/opentelemetry/sdk/autoconfigure/TracerProviderConfigurationTest.java @@ -17,11 +17,12 @@ import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.export.BatchSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import io.opentelemetry.sdk.trace.internal.JcTools; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.Queue; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -97,8 +98,7 @@ void configureSpanProcessor_empty() { assertThat(worker) .extracting("queue") .isInstanceOfSatisfying( - ArrayBlockingQueue.class, - queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048)); + Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2048)); assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { @@ -133,8 +133,7 @@ void configureSpanProcessor_configured() { assertThat(worker) .extracting("queue") .isInstanceOfSatisfying( - ArrayBlockingQueue.class, - queue -> assertThat(queue.remainingCapacity()).isEqualTo(2)); + Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2)); assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter); }); } finally { diff --git a/sdk/trace-shaded-deps/build.gradle.kts b/sdk/trace-shaded-deps/build.gradle.kts new file mode 100644 index 00000000000..6ca21397386 --- /dev/null +++ b/sdk/trace-shaded-deps/build.gradle.kts @@ -0,0 +1,22 @@ +plugins { + `java-library` + + id("com.github.johnrengelman.shadow") +} + +// This project is not published, it is bundled into :sdk:trace + +description = "Internal use only - shaded dependencies of OpenTelemetry SDK for Tracing" +extra["moduleName"] = "io.opentelemetry.sdk.trace.internal" + +dependencies { + implementation("org.jctools:jctools-core") +} + +tasks { + shadowJar { + minimize() + + relocate("org.jctools", "io.opentelemetry.internal.shaded.jctools") + } +} diff --git a/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java new file mode 100644 index 00000000000..255d04138c8 --- /dev/null +++ b/sdk/trace-shaded-deps/src/main/java/io/opentelemetry/sdk/trace/internal/JcTools.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.trace.internal; + +import java.util.Queue; +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpscArrayQueue; + +/** Internal accessor of JCTools package for fast queues. */ +public final class JcTools { + + /** + * Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer. + */ + public static Queue newMpscArrayQueue(int capacity) { + return new MpscArrayQueue<>(capacity); + } + + /** + * Returns the capacity of the {@link Queue}, which must be a JcTools queue. We cast to the + * implementation so callers do not need to use the shaded classes. + */ + public static long capacity(Queue queue) { + return ((MessagePassingQueue) queue).capacity(); + } + + private JcTools() {} +} diff --git a/sdk/trace/build.gradle.kts b/sdk/trace/build.gradle.kts index e0658542c22..476eaeeddd1 100644 --- a/sdk/trace/build.gradle.kts +++ b/sdk/trace/build.gradle.kts @@ -9,10 +9,14 @@ plugins { description = "OpenTelemetry SDK For Tracing" extra["moduleName"] = "io.opentelemetry.sdk.trace" +evaluationDependsOn(":sdk:trace-shaded-deps") + dependencies { api(project(":api:all")) api(project(":sdk:common")) + compileOnly(project(":sdk:trace-shaded-deps")) + implementation(project(":api:metrics")) implementation(project(":semconv")) @@ -24,6 +28,7 @@ dependencies { testImplementation("com.google.guava:guava") jmh(project(":sdk:metrics")) + jmh(project(":sdk:trace-shaded-deps")) jmh(project(":sdk:testing")) { // JMH doesn"t handle dependencies that are duplicated between the main and jmh // configurations properly, but luckily here it"s simple enough to just exclude transitive @@ -62,4 +67,11 @@ tasks { File(propertiesDir, "version.properties").writeText("sdk.version=${project.version}") } } + + jar { + inputs.files(project(":sdk:trace-shaded-deps").file("src")) + val shadowJar = project(":sdk:trace-shaded-deps").tasks.named("shadowJar") + from(zipTree(shadowJar.get().archiveFile)) + dependsOn(shadowJar) + } } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java index 69fa730fe75..ba6a05b2092 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorMultiThreadBenchmark.java @@ -57,10 +57,6 @@ public final void recordMetrics() { new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads); exportedSpans = metrics.exportedSpans(); droppedSpans = metrics.droppedSpans(); - } - - @TearDown(Level.Trial) - public final void tearDown() { processor.shutdown().join(10, TimeUnit.SECONDS); } } diff --git a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java index ff0b2815959..40998ac42af 100644 --- a/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java +++ b/sdk/trace/src/jmh/java/io/opentelemetry/sdk/trace/export/DelayingSpanExporter.java @@ -38,6 +38,7 @@ public CompletableResultCode flush() { @Override public CompletableResultCode shutdown() { + executor.shutdown(); return CompletableResultCode.ofSuccess(); } } diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index 81dad6fdde7..965e9fcb89d 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -17,8 +17,10 @@ import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.internal.JcTools; import java.util.ArrayList; import java.util.Collections; +import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -36,9 +38,6 @@ * {@code maxQueueSize} maximum size, if queue is full spans are dropped). Spans are exported either * when there are {@code maxExportBatchSize} pending spans or {@code scheduleDelayNanos} has passed * since the last export finished. - * - *

This batch {@link SpanProcessor} can cause high contention in a very high traffic service. - * TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention. */ public final class BatchSpanProcessor implements SpanProcessor { @@ -73,7 +72,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) { scheduleDelayNanos, maxExportBatchSize, exporterTimeoutNanos, - new ArrayBlockingQueue<>(maxQueueSize)); + JcTools.newMpscArrayQueue(maxQueueSize)); Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker); workerThread.start(); } @@ -131,7 +130,8 @@ private static final class Worker implements Runnable { private final long exporterTimeoutNanos; private long nextExportTime; - private final BlockingQueue queue; + + private final Queue queue; // When waiting on the spans queue, exporter thread sets this atomic to the number of more // spans it needs before doing an export. Writer threads would then wait for the queue to reach // spansNeeded size before notifying the exporter thread about new entries. @@ -149,7 +149,7 @@ private Worker( long scheduleDelayNanos, int maxExportBatchSize, long exporterTimeoutNanos, - BlockingQueue queue) { + Queue queue) { this.spanExporter = spanExporter; this.scheduleDelayNanos = scheduleDelayNanos; this.maxExportBatchSize = maxExportBatchSize; diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java index 428623def21..90e012fe916 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java @@ -86,7 +86,8 @@ long getExporterTimeoutNanos() { } /** - * Sets the maximum number of Spans that are kept in the queue before start dropping. + * Sets the maximum number of Spans that are kept in the queue before start dropping. More memory + * than this value may be allocated to optimize queue access. * *

See the BatchSampledSpansProcessor class description for a high-level design description of * this class. diff --git a/settings.gradle.kts b/settings.gradle.kts index 5cfd97dafb1..62b6214929f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -63,6 +63,7 @@ include(":sdk:common") include(":sdk:metrics") include(":sdk:testing") include(":sdk:trace") +include(":sdk:trace-shaded-deps") include(":sdk-extensions:async-processor") include(":sdk-extensions:autoconfigure") include(":sdk-extensions:aws")