Skip to content

Commit

Permalink
Use java's inbuilt concurrent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sbandadd committed Mar 11, 2021
1 parent ba8ec8d commit 1816f36
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 24 deletions.
1 change: 0 additions & 1 deletion dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ val DEPENDENCIES = listOf(
"org.awaitility:awaitility:4.0.3",
"org.codehaus.mojo:animal-sniffer-annotations:1.20",
"org.curioswitch.curiostack:protobuf-jackson:1.2.0",
"org.jctools:jctools-core:3.2.0",
"org.junit-pioneer:junit-pioneer:1.3.8",
"org.skyscreamer:jsonassert:1.5.0",
"org.slf4j:slf4j-simple:1.7.30"
Expand Down
1 change: 0 additions & 1 deletion sdk-extensions/autoconfigure/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ dependencies {
testImplementation(project(":sdk:testing"))
testImplementation("com.linecorp.armeria:armeria-junit5")
testImplementation("com.linecorp.armeria:armeria-grpc")
testImplementation("org.jctools:jctools-core")
testRuntimeOnly("io.grpc:grpc-netty-shaded")
testRuntimeOnly("org.slf4j:slf4j-simple")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.when;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanLimits;
Expand All @@ -20,21 +22,29 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jctools.queues.MpscArrayQueue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

// NB: We use AssertJ extracting to reflectively access implementation details to test configuration
// because the use of BatchSpanProcessor makes it difficult to verify values through public means.
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class TracerProviderConfigurationTest {

private static final ConfigProperties EMPTY =
ConfigProperties.createForTest(Collections.emptyMap());

@Mock private SpanExporter exporter;
@Mock private SpanExporter mockSpanExporter;

@BeforeEach
void setUp() {
when(mockSpanExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
}

@Test
void configureTracerProvider() {
Expand Down Expand Up @@ -69,7 +79,7 @@ void configureTracerProvider() {
@Test
void configureSpanProcessor_empty() {
BatchSpanProcessor processor =
TracerProviderConfiguration.configureSpanProcessor(EMPTY, exporter);
TracerProviderConfiguration.configureSpanProcessor(EMPTY, mockSpanExporter);

try {
assertThat(processor)
Expand All @@ -83,12 +93,8 @@ void configureSpanProcessor_empty() {
.extracting("exporterTimeoutNanos")
.isEqualTo(TimeUnit.MILLISECONDS.toNanos(30000));
assertThat(worker).extracting("maxExportBatchSize").isEqualTo(512);
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
MpscArrayQueue.class,
queue -> assertThat(queue.capacity()).isEqualTo(2048));
assertThat(worker).extracting("spanExporter").isEqualTo(exporter);
assertThat(worker).extracting("maxQueueSize").isEqualTo(2048);
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
processor.shutdown();
Expand All @@ -105,7 +111,7 @@ void configureSpanProcessor_configured() {

BatchSpanProcessor processor =
TracerProviderConfiguration.configureSpanProcessor(
ConfigProperties.createForTest(properties), exporter);
ConfigProperties.createForTest(properties), mockSpanExporter);

try {
assertThat(processor)
Expand All @@ -119,11 +125,8 @@ void configureSpanProcessor_configured() {
.extracting("exporterTimeoutNanos")
.isEqualTo(TimeUnit.MILLISECONDS.toNanos(4));
assertThat(worker).extracting("maxExportBatchSize").isEqualTo(3);
assertThat(worker)
.extracting("queue")
.isInstanceOfSatisfying(
MpscArrayQueue.class, queue -> assertThat(queue.capacity()).isEqualTo(2));
assertThat(worker).extracting("spanExporter").isEqualTo(exporter);
assertThat(worker).extracting("maxQueueSize").isEqualTo(2);
assertThat(worker).extracting("spanExporter").isEqualTo(mockSpanExporter);
});
} finally {
processor.shutdown();
Expand Down
1 change: 0 additions & 1 deletion sdk/trace/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ dependencies {
jmh("io.grpc:grpc-api")
jmh("io.grpc:grpc-netty-shaded")
jmh("org.testcontainers:testcontainers") // testContainer for OTLP collector
implementation("org.jctools:jctools-core")
}

sourceSets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import org.jctools.queues.MpscArrayQueue;

/**
* Implementation of the {@link SpanProcessor} that batches spans exported by the SDK then pushes
Expand Down Expand Up @@ -75,7 +76,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new MpscArrayQueue<ReadableSpan>(maxQueueSize));
new ConcurrentLinkedQueue<ReadableSpan>(),
maxQueueSize);
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
}
Expand Down Expand Up @@ -148,7 +150,9 @@ private static final class Worker implements Runnable {
private final int maxExportBatchSize;
private final long exporterTimeoutNanos;
private long nextExportTime;
private final MpscArrayQueue<ReadableSpan> queue;
private final ConcurrentLinkedQueue<ReadableSpan> queue;
private final int maxQueueSize;
private final AtomicInteger queueSize = new AtomicInteger(0);
private final AtomicLong addedSpansCounter = new AtomicLong(0);
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
Expand All @@ -165,12 +169,14 @@ private Worker(
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
MpscArrayQueue<ReadableSpan> queue) {
ConcurrentLinkedQueue<ReadableSpan> queue,
int maxQueueSize) {
this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.queue = queue;
this.maxQueueSize = maxQueueSize;
this.lock = new ReentrantLock();
this.needExport = lock.newCondition();
Meter meter = GlobalMetricsProvider.getMeter("io.opentelemetry.sdk.trace");
Expand All @@ -181,7 +187,7 @@ private Worker(
.setUpdater(
result ->
result.observe(
queue.size(),
queueSize.get(),
Labels.of(SPAN_PROCESSOR_TYPE_LABEL, SPAN_PROCESSOR_TYPE_VALUE)))
.build();
LongCounter processedSpansCounter =
Expand All @@ -203,10 +209,12 @@ private Worker(
}

private void addSpan(ReadableSpan span) {
if (!queue.offer(span)) {
if (queueSize.get() >= maxQueueSize) {
droppedSpansCounter.incrementAndGet();
droppedSpans.add(1);
} else {
queue.offer(span);
queueSize.incrementAndGet();
if (addedSpansCounter.incrementAndGet() % maxExportBatchSize == 0) {
lock.lock();
try {
Expand All @@ -226,6 +234,7 @@ public void run() {
flush();
}
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
queueSize.decrementAndGet();
batch.add(queue.poll().toSpanData());
}
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
Expand All @@ -252,6 +261,7 @@ public void run() {
private void flush() {
while (!queue.isEmpty()) {
batch.add(queue.poll().toSpanData());
queueSize.decrementAndGet();
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
}
Expand Down

0 comments on commit 1816f36

Please sign in to comment.