Skip to content

Commit

Permalink
Replace ArrayBlockingQueue with jctools queue. (#3034)
Browse files Browse the repository at this point in the history
* Replace ArrayBlockingQueue with jctools queue.

* Finish

* ArrayQueue

* Fix dependency

* Drift

* Memory note

* Iteration
  • Loading branch information
Anuraag Agrawal authored Mar 31, 2021
1 parent c667636 commit 2f2af19
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 16 deletions.
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ val DEPENDENCIES = listOf(
"org.awaitility:awaitility:4.0.3",
"org.codehaus.mojo:animal-sniffer-annotations:1.20",
"org.curioswitch.curiostack:protobuf-jackson:1.2.0",
"org.jctools:jctools-core:3.3.0",
"org.junit-pioneer:junit-pioneer:1.3.8",
"org.skyscreamer:jsonassert:1.5.0",
"org.slf4j:slf4j-simple:1.7.30"
Expand Down
2 changes: 2 additions & 0 deletions sdk-extensions/autoconfigure/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ dependencies {
compileOnly("io.prometheus:simpleclient_httpserver")
compileOnly(project(":exporters:zipkin"))

testImplementation(project(path=":sdk:trace-shaded-deps"))

testImplementation(project(":proto"))
testImplementation(project(":sdk:testing"))
testImplementation("com.linecorp.armeria:armeria-junit5")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.internal.JcTools;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -97,8 +98,7 @@ void configureSpanProcessor_empty() {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048));
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2048));
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
Expand Down Expand Up @@ -133,8 +133,7 @@ void configureSpanProcessor_configured() {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2));
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2));
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
Expand Down
22 changes: 22 additions & 0 deletions sdk/trace-shaded-deps/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins {
`java-library`

id("com.github.johnrengelman.shadow")
}

// This project is not published, it is bundled into :sdk:trace

description = "Internal use only - shaded dependencies of OpenTelemetry SDK for Tracing"
extra["moduleName"] = "io.opentelemetry.sdk.trace.internal"

dependencies {
implementation("org.jctools:jctools-core")
}

tasks {
shadowJar {
minimize()

relocate("org.jctools", "io.opentelemetry.internal.shaded.jctools")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.internal;

import java.util.Queue;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;

/** Internal accessor of JCTools package for fast queues. */
public final class JcTools {

/**
* Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer.
*/
public static <T> Queue<T> newMpscArrayQueue(int capacity) {
return new MpscArrayQueue<>(capacity);
}

/**
* Returns the capacity of the {@link Queue}, which must be a JcTools queue. We cast to the
* implementation so callers do not need to use the shaded classes.
*/
public static long capacity(Queue<?> queue) {
return ((MessagePassingQueue<?>) queue).capacity();
}

private JcTools() {}
}
12 changes: 12 additions & 0 deletions sdk/trace/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ plugins {
description = "OpenTelemetry SDK For Tracing"
extra["moduleName"] = "io.opentelemetry.sdk.trace"

evaluationDependsOn(":sdk:trace-shaded-deps")

dependencies {
api(project(":api:all"))
api(project(":sdk:common"))

compileOnly(project(":sdk:trace-shaded-deps"))

implementation(project(":api:metrics"))
implementation(project(":semconv"))

Expand All @@ -24,6 +28,7 @@ dependencies {
testImplementation("com.google.guava:guava")

jmh(project(":sdk:metrics"))
jmh(project(":sdk:trace-shaded-deps"))
jmh(project(":sdk:testing")) {
// JMH doesn"t handle dependencies that are duplicated between the main and jmh
// configurations properly, but luckily here it"s simple enough to just exclude transitive
Expand Down Expand Up @@ -62,4 +67,11 @@ tasks {
File(propertiesDir, "version.properties").writeText("sdk.version=${project.version}")
}
}

jar {
inputs.files(project(":sdk:trace-shaded-deps").file("src"))
val shadowJar = project(":sdk:trace-shaded-deps").tasks.named<Jar>("shadowJar")
from(zipTree(shadowJar.get().archiveFile))
dependsOn(shadowJar)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ public final void recordMetrics() {
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public CompletableResultCode flush() {

@Override
public CompletableResultCode shutdown() {
executor.shutdown();
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.internal.JcTools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,9 +38,6 @@
* {@code maxQueueSize} maximum size, if queue is full spans are dropped). Spans are exported either
* when there are {@code maxExportBatchSize} pending spans or {@code scheduleDelayNanos} has passed
* since the last export finished.
*
* <p>This batch {@link SpanProcessor} can cause high contention in a very high traffic service.
* TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention.
*/
public final class BatchSpanProcessor implements SpanProcessor {

Expand Down Expand Up @@ -73,7 +72,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize));
JcTools.newMpscArrayQueue(maxQueueSize));
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
}
Expand Down Expand Up @@ -131,7 +130,8 @@ private static final class Worker implements Runnable {
private final long exporterTimeoutNanos;

private long nextExportTime;
private final BlockingQueue<ReadableSpan> queue;

private final Queue<ReadableSpan> queue;
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
// spansNeeded size before notifying the exporter thread about new entries.
Expand All @@ -149,7 +149,7 @@ private Worker(
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
BlockingQueue<ReadableSpan> queue) {
Queue<ReadableSpan> queue) {
this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ long getExporterTimeoutNanos() {
}

/**
* Sets the maximum number of Spans that are kept in the queue before start dropping.
* Sets the maximum number of Spans that are kept in the queue before start dropping. More memory
* than this value may be allocated to optimize queue access.
*
* <p>See the BatchSampledSpansProcessor class description for a high-level design description of
* this class.
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ include(":sdk:common")
include(":sdk:metrics")
include(":sdk:testing")
include(":sdk:trace")
include(":sdk:trace-shaded-deps")
include(":sdk-extensions:async-processor")
include(":sdk-extensions:autoconfigure")
include(":sdk-extensions:aws")
Expand Down

0 comments on commit 2f2af19

Please sign in to comment.