Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ArrayBlockingQueue with jctools queue. #3034

Merged
merged 9 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ val DEPENDENCIES = listOf(
"org.awaitility:awaitility:4.0.3",
"org.codehaus.mojo:animal-sniffer-annotations:1.20",
"org.curioswitch.curiostack:protobuf-jackson:1.2.0",
"org.jctools:jctools-core:3.3.0",
"org.junit-pioneer:junit-pioneer:1.3.8",
"org.skyscreamer:jsonassert:1.5.0",
"org.slf4j:slf4j-simple:1.7.30"
Expand Down
2 changes: 2 additions & 0 deletions sdk-extensions/autoconfigure/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ dependencies {
compileOnly("io.prometheus:simpleclient_httpserver")
compileOnly(project(":exporters:zipkin"))

testImplementation(project(path=":sdk:trace-shaded-deps"))

testImplementation(project(":proto"))
testImplementation(project(":sdk:testing"))
testImplementation("com.linecorp.armeria:armeria-junit5")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.internal.JcTools;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -97,8 +98,7 @@ void configureSpanProcessor_empty() {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2048));
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2048));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing logic is verifying the remainingCapacity(). Do you want to do the same?

     (JcTools.capacity(queue) - JcTools.size(queue)).isEqualTo(2048)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, it only used tha tmethod since the JDK only provides that one, but capacity is what we're checking

assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
Expand Down Expand Up @@ -133,8 +133,7 @@ void configureSpanProcessor_configured() {
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
ArrayBlockingQueue.class,
queue -> assertThat(queue.remainingCapacity()).isEqualTo(2));
Queue.class, queue -> assertThat(JcTools.capacity(queue)).isEqualTo(2));
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
Expand Down
22 changes: 22 additions & 0 deletions sdk/trace-shaded-deps/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
plugins {
`java-library`

id("com.github.johnrengelman.shadow")
}

// This project is not published, it is bundled into :sdk:trace

description = "Internal use only - shaded dependencies of OpenTelemetry SDK for Tracing"
extra["moduleName"] = "io.opentelemetry.sdk.trace.internal"

dependencies {
implementation("org.jctools:jctools-core")
}

tasks {
shadowJar {
minimize()

relocate("org.jctools", "io.opentelemetry.internal.shaded.jctools")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.trace.internal;

import java.util.Queue;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscArrayQueue;

/** Internal accessor of JCTools package for fast queues. */
public final class JcTools {

/**
* Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer.
*/
public static <T> Queue<T> newMpscArrayQueue(int capacity) {
return new MpscArrayQueue<>(capacity);
}

/**
* Returns the capacity of the {@link Queue}, which must be a JcTools queue. We cast to the
* implementation so callers do not need to use the shaded classes.
*/
public static long capacity(Queue<?> queue) {
return ((MessagePassingQueue<?>) queue).capacity();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use MpscArrayQueue right? It does have capacity()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're shading it and using this in a test in a different artifact where we don't want to have to reference the shaded class. I could cast to MpscArrayQueue here too but may as well stick with the iterface.

}

private JcTools() {}
}
12 changes: 12 additions & 0 deletions sdk/trace/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ plugins {
description = "OpenTelemetry SDK For Tracing"
extra["moduleName"] = "io.opentelemetry.sdk.trace"

evaluationDependsOn(":sdk:trace-shaded-deps")

dependencies {
api(project(":api:all"))
api(project(":sdk:common"))

compileOnly(project(":sdk:trace-shaded-deps"))

implementation(project(":api:metrics"))
implementation(project(":semconv"))

Expand All @@ -24,6 +28,7 @@ dependencies {
testImplementation("com.google.guava:guava")

jmh(project(":sdk:metrics"))
jmh(project(":sdk:trace-shaded-deps"))
jmh(project(":sdk:testing")) {
// JMH doesn"t handle dependencies that are duplicated between the main and jmh
// configurations properly, but luckily here it"s simple enough to just exclude transitive
Expand Down Expand Up @@ -62,4 +67,11 @@ tasks {
File(propertiesDir, "version.properties").writeText("sdk.version=${project.version}")
}
}

jar {
inputs.files(project(":sdk:trace-shaded-deps").file("src"))
val shadowJar = project(":sdk:trace-shaded-deps").tasks.named<Jar>("shadowJar")
from(zipTree(shadowJar.get().archiveFile))
dependsOn(shadowJar)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ public final void recordMetrics() {
new BatchSpanProcessorMetrics(sdkMeterProvider.collectAllMetrics(), numThreads);
exportedSpans = metrics.exportedSpans();
droppedSpans = metrics.droppedSpans();
}

@TearDown(Level.Trial)
public final void tearDown() {
processor.shutdown().join(10, TimeUnit.SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public CompletableResultCode flush() {

@Override
public CompletableResultCode shutdown() {
executor.shutdown();
return CompletableResultCode.ofSuccess();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import io.opentelemetry.sdk.trace.ReadableSpan;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.internal.JcTools;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,9 +38,6 @@
* {@code maxQueueSize} maximum size, if queue is full spans are dropped). Spans are exported either
* when there are {@code maxExportBatchSize} pending spans or {@code scheduleDelayNanos} has passed
* since the last export finished.
*
* <p>This batch {@link SpanProcessor} can cause high contention in a very high traffic service.
* TODO: Add a link to the SpanProcessor that uses Disruptor as alternative with low contention.
*/
public final class BatchSpanProcessor implements SpanProcessor {

Expand Down Expand Up @@ -73,7 +72,7 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize));
JcTools.newMpscArrayQueue(maxQueueSize));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that MpscArrayQueue rounds the queue size to power to 2 for various perf reasons. In my opinion it is better to enforce this so users know what the actual memory that is getting allocated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean falling back to ArrayBlockingQueue if size isn't power of 2? I don't think we can require this for the BSP setting instead since it's too tricky to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant enforcing the maxQueueSize to be a power of 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That we can't do we don't want to lose usability (adding restrictions that can only be conveyed through documentation or error messages) here. Would like to hear more thoughts on whether we should fallback if it's not power-of-2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Falling back is not great really since it is not an efficient solution. How about calling it out in the documentation that queue size is rounded to the next power of 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a note to the builder that some more memory may be allocated, without going too much into implementation detail.

Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
}
Expand Down Expand Up @@ -131,7 +130,8 @@ private static final class Worker implements Runnable {
private final long exporterTimeoutNanos;

private long nextExportTime;
private final BlockingQueue<ReadableSpan> queue;

private final Queue<ReadableSpan> queue;
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
// spansNeeded size before notifying the exporter thread about new entries.
Expand All @@ -149,7 +149,7 @@ private Worker(
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
BlockingQueue<ReadableSpan> queue) {
Queue<ReadableSpan> queue) {
this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ long getExporterTimeoutNanos() {
}

/**
* Sets the maximum number of Spans that are kept in the queue before start dropping.
* Sets the maximum number of Spans that are kept in the queue before start dropping. More memory
* than this value may be allocated to optimize queue access.
*
* <p>See the BatchSampledSpansProcessor class description for a high-level design description of
* this class.
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ include(":sdk:common")
include(":sdk:metrics")
include(":sdk:testing")
include(":sdk:trace")
include(":sdk:trace-shaded-deps")
include(":sdk-extensions:async-processor")
include(":sdk-extensions:autoconfigure")
include(":sdk-extensions:aws")
Expand Down