Skip to content

Commit

Permalink
Move async propagation API from scope to tracer (#8231)
Browse files Browse the repository at this point in the history
* feat(core): Move async propagation API from scope to tracer
  • Loading branch information
PerfectSlayer authored Jan 21, 2025
1 parent 2b72cb5 commit d5592c6
Show file tree
Hide file tree
Showing 47 changed files with 289 additions and 244 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;

import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand All @@ -26,7 +27,7 @@ public static AgentScope startTaskScope(State state) {
final AgentScope.Continuation continuation = state.getAndResetContinuation();
if (continuation != null) {
final AgentScope scope = continuation.activate();
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
// important - stop timing after the scope has been activated so the time in the queue can
// be attributed to the correct context without duplicating the propagated information
state.stopTiming();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

/**
* Test executor instrumentation for Akka specific classes.
* This is to large extent a copy of ExecutorInstrumentationTest.
Expand Down Expand Up @@ -45,7 +46,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new AkkaAsyncChild())
// this child won't
Expand Down Expand Up @@ -101,7 +102,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
dispatcher.execute(new AkkaAsyncChild())
// this child won't
Expand Down Expand Up @@ -132,7 +133,7 @@ class AkkaExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
try {
for (int i = 0; i < 20; ++i) {
// Our current instrumentation instrumentation does not behave very well
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.apachehttpasyncclient;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.instrumentation.apachehttpasyncclient.ApacheHttpAsyncClientDecorator.DECORATE;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand Down Expand Up @@ -39,7 +40,7 @@ public void completed(final T result) {
completeDelegate(result);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
completeDelegate(result);
}
}
Expand All @@ -56,7 +57,7 @@ public void failed(final Exception ex) {
failDelegate(ex);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
failDelegate(ex);
}
}
Expand All @@ -72,7 +73,7 @@ public void cancelled() {
cancelDelegate();
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
cancelDelegate();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.apachehttpclient5;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.DECORATE;

import datadog.trace.bootstrap.instrumentation.api.AgentScope;
Expand Down Expand Up @@ -42,7 +43,7 @@ public void completed(final T result) {
completeDelegate(result);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
completeDelegate(result);
}
}
Expand All @@ -59,7 +60,7 @@ public void failed(final Exception ex) {
failDelegate(ex);
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
failDelegate(ex);
}
}
Expand All @@ -75,7 +76,7 @@ public void cancelled() {
cancelDelegate();
} else {
try (final AgentScope scope = parentContinuation.activate()) {
scope.setAsyncPropagation(true);
setAsyncPropagationEnabled(true);
cancelDelegate();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package datadog.trace.instrumentation.hystrix;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import net.bytebuddy.asm.Advice;

@AutoService(InstrumenterModule.class)
Expand All @@ -35,23 +35,17 @@ public static class EnableAsyncAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static boolean enableAsyncTracking() {
final AgentScope scope = activeScope();
if (scope != null) {
if (!scope.isAsyncPropagating()) {
scope.setAsyncPropagation(true);
return true;
}
if (!isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(true);
return true;
}
return false;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void disableAsyncTracking(@Advice.Enter final boolean wasEnabled) {
if (wasEnabled) {
final AgentScope scope = activeScope();
if (scope != null) {
scope.setAsyncPropagation(false);
}
setAsyncPropagationEnabled(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.function.Supplier

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

/**
* Note: ideally this should live with the rest of ExecutorInstrumentationTest,
Expand Down Expand Up @@ -45,7 +45,7 @@ class CompletableFutureTest extends AgentTestRunner {
@Trace(operationName = "parent")
CompletableFuture<String> get() {
try {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
return CompletableFuture.supplyAsync(supplier, pool)
.thenCompose({ s -> CompletableFuture.supplyAsync(new AppendingSupplier(s), differentPool) })
.thenApply(function)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.util.concurrent.ExecutorCompletionService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

class VirtualThreadTest extends AgentTestRunner {
@Shared
Expand Down Expand Up @@ -37,7 +37,7 @@ class VirtualThreadTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -177,19 +177,18 @@ public void methodAdvice(MethodTransformer transformer) {
public static class DisableAsyncAdvice {

@Advice.OnMethodEnter
public static AgentScope before() {
AgentScope scope = activeScope();
if (null != scope && scope.isAsyncPropagating()) {
scope.setAsyncPropagation(false);
return scope;
public static boolean before() {
if (isAsyncPropagationEnabled()) {
setAsyncPropagationEnabled(false);
return true;
}
return null;
return false;
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(@Advice.Enter AgentScope scope) {
if (null != scope) {
scope.setAsyncPropagation(true);
public static void after(@Advice.Enter boolean wasDisabled) {
if (wasDisabled) {
setAsyncPropagationEnabled(true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled
import static org.junit.Assume.assumeTrue

abstract class ExecutorInstrumentationTest extends AgentTestRunner {
Expand Down Expand Up @@ -82,7 +82,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down Expand Up @@ -256,7 +256,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
def future = m(pool, task)
sleep(500)
future.cancel(true)
Expand Down Expand Up @@ -317,7 +317,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
pool.execute(new JavaAsyncChild())
// this child won't
Expand Down Expand Up @@ -368,7 +368,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
m(pool, w(child))
}
}.run()
Expand Down Expand Up @@ -406,7 +406,7 @@ abstract class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
try {
for (int i = 0; i < 20; ++i) {
final JavaAsyncChild child = new JavaAsyncChild(false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled
import static org.junit.Assume.assumeTrue

class NettyExecutorInstrumentationTest extends AgentTestRunner {
Expand Down Expand Up @@ -65,7 +65,7 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down Expand Up @@ -213,7 +213,7 @@ class NettyExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
try {
for (int i = 0; i < 20; ++i) {
final JavaAsyncChild child = new JavaAsyncChild(false, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled

class RejectedExecutionTest extends AgentTestRunner {

Expand Down Expand Up @@ -193,7 +193,7 @@ class RejectedExecutionTest extends AgentTestRunner {

when:
runUnderTrace("parent") {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// must be rejected because the queue will be full until some
// time after the first task is released
executor.submit((Runnable) new JavaAsyncChild(true, false))
Expand Down Expand Up @@ -237,7 +237,7 @@ class RejectedExecutionTest extends AgentTestRunner {

return {
runUnderTrace("parent") {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
pool.submit({})
}
}
Expand All @@ -261,7 +261,7 @@ class RejectedExecutionTest extends AgentTestRunner {

return {
runUnderTrace("parent") {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// must be rejected because the queue will be full until some
// time after the first task is released
def testTask = new JavaAsyncChild(true, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled
import static org.junit.Assume.assumeTrue

class JettyExecutorInstrumentationTest extends AgentTestRunner {
Expand Down Expand Up @@ -46,7 +46,7 @@ class JettyExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
activeScope().setAsyncPropagation(true)
setAsyncPropagationEnabled(true)
// this child will have a span
m(pool, new JavaAsyncChild())
// this child won't
Expand Down
Loading

0 comments on commit d5592c6

Please sign in to comment.