Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotlinx-coroutines-reactor context propagation #5196

Merged
merged 10 commits into from
Jan 28, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions instrumentation/kotlinx-coroutines/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@ muzzle {
group.set("org.jetbrains.kotlinx")
module.set("kotlinx-coroutines-core")
versions.set("[1.0.0,1.3.8)")
extraDependency("io.projectreactor:reactor-core:3.1.10.RELEASE")
extraDependency("org.reactivestreams:reactive-streams:1.0.2")
}
// 1.3.9 (and beyond?) have changed how artifact names are resolved due to multiplatform variants
pass {
group.set("org.jetbrains.kotlinx")
module.set("kotlinx-coroutines-core-jvm")
versions.set("[1.3.9,)")
extraDependency("io.projectreactor:reactor-core:3.1.10.RELEASE")
extraDependency("org.reactivestreams:reactive-streams:1.0.2")
}
}
dependencies {
compileOnly("io.opentelemetry:opentelemetry-extension-kotlin")
compileOnly("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// Use first version with flow support since we have tests for it.
library("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
library("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.3.0")
implementation(project(":instrumentation:reactor:reactor-3.1:library"))

testImplementation("io.opentelemetry:opentelemetry-extension-kotlin")
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
// Use first version with flow support since we have tests for it.
testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0")
}

tasks {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor;

import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KotlinCoroutinesReactorInstrumentationModule extends InstrumentationModule {

public KotlinCoroutinesReactorInstrumentationModule() {
super("kotlinx-coroutines", "kotlinx-coroutines-reactor");
}

@Override
public boolean isHelperClass(String className) {
return className.startsWith("io.opentelemetry.extension.kotlin.");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new KotlinMonoCoroutineInstrumentation(), new KotlinPublisherCoroutineInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.KotlinCoroutinesInstrumentationHelper;
import kotlin.coroutines.CoroutineContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.publisher.MonoSink;

public class KotlinMonoCoroutineInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf("kotlinx.coroutines.reactor.MonoCoroutine");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
takesArgument(0, named("kotlin.coroutines.CoroutineContext"))
.and(takesArgument(1, named("reactor.core.publisher.MonoSink")))),
this.getClass().getName() + "$MonoCoroutineAdvice");
}

@SuppressWarnings("unused")
public static class MonoCoroutineAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext,
@Advice.Argument(1) MonoSink<?> monoSink) {
Context context =
ContextPropagationOperator.getOpenTelemetryContext(
monoSink.currentContext(), Java8BytecodeBridge.currentContext());
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.reactor;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.KotlinCoroutinesInstrumentationHelper;
import kotlin.coroutines.CoroutineContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;

public class KotlinPublisherCoroutineInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return namedOneOf("kotlinx.coroutines.reactive.PublisherCoroutine");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor()
.and(
takesArgument(0, named("kotlin.coroutines.CoroutineContext"))
.and(takesArgument(1, named("org.reactivestreams.Subscriber")))),
this.getClass().getName() + "$PublisherCoroutineAdvice");
}

@SuppressWarnings("unused")
public static class PublisherCoroutineAdvice {

@Advice.OnMethodEnter
public static void enter(
@Advice.Argument(value = 0, readOnly = false) CoroutineContext coroutineContext,
@Advice.Argument(1) Subscriber<?> subscriber) {
if (subscriber instanceof CoreSubscriber) {
CoreSubscriber<?> coreSubscriber = (CoreSubscriber) subscriber;
Context context =
ContextPropagationOperator.getOpenTelemetryContext(
coreSubscriber.currentContext(), Java8BytecodeBridge.currentContext());
coroutineContext =
KotlinCoroutinesInstrumentationHelper.addOpenTelemetryContext(coroutineContext);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,60 @@ class KotlinCoroutineInstrumentationTest extends AgentInstrumentationSpecificati
assert seenItersA.equals(expectedIters)
assert seenItersB.equals(expectedIters)
}

def "kotlin traced mono"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedMono()

then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
attributes {
}
}
span("child") {
childOf span(0)
attributes {
}
}
}
}

where:
dispatcher << dispatchersToTest
}

def "kotlin traced flux"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)

when:
kotlinTest.tracedFlux()

then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
attributes {
}
}
(0..2).each {
span("child_$it") {
childOf span(0)
attributes {
}
}
}
}
}

where:
dispatcher << dispatchersToTest
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.reactor.flux
import kotlinx.coroutines.reactor.mono
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.withContext
Expand Down Expand Up @@ -125,6 +129,22 @@ class KotlinCoroutineTests(private val dispatcher: CoroutineDispatcher) {
}
}

fun tracedMono(): Unit = runTest {
mono(dispatcher) {
tracedChild("child")
}.awaitSingle()
}

fun tracedFlux() = runTest {
flux(dispatcher) {
repeat(3) {
tracedChild("child_$it")
send(it)
}
}.collect {
}
}

fun launchConcurrentSuspendFunctions(numIters: Int) {
runBlocking {
for (i in 0 until numIters) {
Expand Down