Skip to content

Commit

Permalink
Reactor: early propagate span in context when subscribing (#8166)
Browse files Browse the repository at this point in the history
* Reactor: early propagate span in context when subscribing

* review
  • Loading branch information
amarziali authored Jan 8, 2025
1 parent 9b219d4 commit 73f194e
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
import reactor.core.publisher.Hooks
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.util.context.Context
import spock.lang.Shared

import java.time.Duration
import java.util.concurrent.CompletableFuture

class ReactorCoreTest extends AgentTestRunner {

Expand Down Expand Up @@ -443,12 +443,41 @@ class ReactorCoreTest extends AgentTestRunner {

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count()
def mono = Flux.range(1, 100).windowUntil { it % 10 == 0 }.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 10
}
def "span in the context has to be activated when the publisher subscribes"() {
when:
// the mono is subscribed (block) when first is active.
// However we expect that the span third will have second as parent and not first
// because we set the parent explicitly in the reactor context (dd.span key)
def result = runUnderTrace("first", {
runUnderTrace("second", {
def mono = Mono.defer {
Mono.fromCompletionStage(CompletableFuture.supplyAsync {
runUnderTrace("third", {
"hello world"
})
})
}.contextWrite(Context.of("dd.span", TEST_TRACER.activeSpan()))
mono
})
.block()
})
then:
assert result == "hello world"
assertTraces(1, {
trace(3, true) {
basicSpan(it, "first")
basicSpan(it, "second", span(0))
basicSpan(it, "third", span(1))
}
})
}
@Trace(operationName = "trace-parent", resourceName = "trace-parent")
def assemblePublisherUnderTrace(def publisherSupplier) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package datadog.trace.instrumentation.reactor.core;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
import javax.annotation.Nullable;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

public class ContextSpanHelper {
private static final String DD_SPAN_KEY = "dd.span";

private ContextSpanHelper() {}

@Nullable
public static AgentSpan extractSpanFromSubscriberContext(final CoreSubscriber<?> subscriber) {
if (subscriber == null) {
return null;
}
Context context = null;
try {
context = subscriber.currentContext();
} catch (Throwable ignored) {
}
if (context == null) {
return null;
}
if (context.hasKey(DD_SPAN_KEY)) {
Object maybeSpan = context.get(DD_SPAN_KEY);
if (maybeSpan instanceof WithAgentSpan) {
return ((WithAgentSpan) maybeSpan).asAgentSpan();
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package datadog.trace.instrumentation.reactor.core;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.hasSuperType;
import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;

@AutoService(InstrumenterModule.class)
public class CorePublisherInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice {
public CorePublisherInstrumentation() {
super("reactor-core");
}

@Override
public String hierarchyMarkerType() {
return "reactor.core.CoreSubscriber";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named("reactor.core.CorePublisher")) // from 3.1.7
.or(
hasSuperType(
namedOneOf(
"reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))); // < 3.1.7
}

@Override
public Map<String, String> contextStore() {
final Map<String, String> ret = new HashMap<>();
ret.put("org.reactivestreams.Subscriber", AgentSpan.class.getName());
ret.put("org.reactivestreams.Publisher", AgentSpan.class.getName());
return ret;
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ContextSpanHelper",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
named("subscribe")
.and(not(isStatic()))
.and(takesArguments(1))
.and(takesArgument(0, named("reactor.core.CoreSubscriber"))),
getClass().getName() + "$PropagateContextSpanOnSubscribe");
}

public static class PropagateContextSpanOnSubscribe {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(
@Advice.This Publisher<?> self, @Advice.Argument(0) final CoreSubscriber<?> subscriber) {
final AgentSpan span = extractSpanFromSubscriberContext(subscriber);

if (span != null) {
// we force storing the span state linked to publisher and subscriber to the one explicitly
// present in the context so that, if PublisherInstrumentation is kicking in after this
// advice, it won't override that active span
InstrumentationContext.get(Publisher.class, AgentSpan.class).put(self, span);
InstrumentationContext.get(Subscriber.class, AgentSpan.class).put(subscriber, span);
return activateSpan(span);
}
return null;
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void after(@Advice.Enter final AgentScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractSpanFromSubscriberContext;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.core.CoreSubscriber;
import reactor.util.context.Context;

@AutoService(InstrumenterModule.class)
public class CoreSubscriberInstrumentation extends InstrumenterModule.Tracing
Expand All @@ -34,6 +33,13 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
return implementsInterface(named(hierarchyMarkerType()));
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ContextSpanHelper",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
Expand All @@ -44,22 +50,9 @@ public void methodAdvice(MethodTransformer transformer) {
public static class PropagateSpanInScopeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope before(@Advice.This final CoreSubscriber<?> self) {
Context context = null;
try {
context = self.currentContext();
} catch (Throwable ignored) {
}
if (context == null) {
return null;
}
if (context.hasKey("dd.span")) {
Object maybeSpan = context.get("dd.span");
if (maybeSpan instanceof WithAgentSpan) {
AgentSpan span = ((WithAgentSpan) maybeSpan).asAgentSpan();
if (span != null) {
return activateSpan(span);
}
}
final AgentSpan span = extractSpanFromSubscriberContext(self);
if (span != null) {
return activateSpan(span);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import reactor.util.context.Context
import spock.lang.Shared

import java.time.Duration
import java.util.concurrent.CompletableFuture

class ReactorCoreTest extends AgentTestRunner {

Expand Down Expand Up @@ -440,6 +441,44 @@ class ReactorCoreTest extends AgentTestRunner {
})
}

def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil { it % 10 == 0 }.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 11
}
def "span in the context has to be activated when the publisher subscribes"() {
when:
// the mono is subscribed (block) when first is active.
// However we expect that the span third will have second as parent and not first
// because we set the parent explicitly in the reactor context (dd.span key)
def result = runUnderTrace("first", {
runUnderTrace("second", {
def mono = Mono.defer {
Mono.fromCompletionStage(CompletableFuture.supplyAsync {
runUnderTrace("third", {
"hello world"
})
})
}.subscriberContext(Context.of("dd.span", TEST_TRACER.activeSpan()))
mono
})
.block()
})
then:
assert result == "hello world"
assertTraces(1, {
trace(3, true) {
basicSpan(it, "first")
basicSpan(it, "second", span(0))
basicSpan(it, "third", span(1))
}
})
}
@Trace(operationName = "trace-parent", resourceName = "trace-parent")
def assemblePublisherUnderTrace(def publisherSupplier) {
def span = startSpan("publisher-parent")
Expand Down Expand Up @@ -490,14 +529,6 @@ class ReactorCoreTest extends AgentTestRunner {
span.finish()
}
def "test currentContext() calls on inner operator is not throwing a NPE on the advice"() {
when:
def mono = Flux.range(1, 100).windowUntil {it % 10 == 0}.count()
then:
// we are not interested into asserting a trace structure but only that the instrumentation error count is 0
assert mono.block() == 11
}

@Trace(operationName = "addOne", resourceName = "addOne")
def static addOneFunc(int i) {
return i + 1
Expand Down

0 comments on commit 73f194e

Please sign in to comment.