Skip to content

Commit

Permalink
Enable strict context check and fix some context issues. (#2637)
Browse files Browse the repository at this point in the history
* Enable strict context check and fix some context issues.

* Drift

* Drift and cache

* Exclude grizzly include akka

* Grizzly, scala

* ForkJoin worker

* webflux comment, grizzly typo

* Give up on akka for now

* threadpool

* Fallback on grizzly, wait for completion in executor cancellation tests

* Hystrix

* ratpack

* Cleanups
  • Loading branch information
Anuraag Agrawal authored Mar 29, 2021
1 parent c918d05 commit dcd316d
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 72 deletions.
6 changes: 6 additions & 0 deletions gradle/java.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ tasks.withType(Test).configureEach {
testLogging {
exceptionFormat = 'full'
}

// There's no real harm in setting this for all tests even if any happen to not be using context
// propagation.
jvmArgs "-Dio.opentelemetry.context.enableStrictContext=true"
// TODO(anuraaga): Have agent map unshaded to shaded.
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=true"
}

tasks.withType(AbstractArchiveTask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,8 @@ compileVersion101TestGroovy {
classpath = classpath.plus(files(compileVersion101TestScala.destinationDir))
dependsOn compileVersion101TestScala
}

tasks.withType(Test) {
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2639
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ private void populateGenericAttributes(Span span, ExecutionAttributes attributes
@Override
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
Scope scope = executionAttributes.getAttribute(SCOPE_ATTRIBUTE);
if (scope != null) {
scope.close();
}
io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
clearAttributes(executionAttributes);
Span span = Span.fromContext(otelContext);
Expand Down Expand Up @@ -168,6 +164,10 @@ public void onExecutionFailure(
}

private void clearAttributes(ExecutionAttributes executionAttributes) {
Scope scope = executionAttributes.getAttribute(SCOPE_ATTRIBUTE);
if (scope != null) {
scope.close();
}
executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, null);
executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.ForkJoinTask
import java.util.concurrent.Future
Expand Down Expand Up @@ -134,7 +135,7 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {

def "#poolImpl '#name' wrap lambdas"() {
setup:
def pool = poolImpl
ExecutorService pool = poolImpl
def m = method
def w = wrap

Expand All @@ -160,7 +161,7 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {
}

cleanup:
pool?.shutdown()
pool.shutdown()

where:
name | method | wrap | poolImpl
Expand All @@ -173,7 +174,7 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {

def "#poolImpl '#name' reports after canceled jobs"() {
setup:
def pool = poolImpl
ExecutorService pool = poolImpl
def m = method
List<JavaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
Expand Down Expand Up @@ -216,6 +217,11 @@ class ExecutorInstrumentationTest extends AgentInstrumentationSpecification {
expect:
waitForTraces(1).size() == 1

// Wait for shutdown since we didn't wait on task completion and want to confirm any pending
// ones clean up context.
pool.shutdown()
pool.awaitTermination(10, TimeUnit.SECONDS)

where:
name | method | poolImpl
"submit Runnable" | submitRunnable | new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ dependencies {

tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.grizzly.enabled=true"
}

tasks.withType(Test) {
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2640
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.hystrix.experimental-span-attributes=true"
// Disable so failure testing below doesn't inadvertently change the behavior.
jvmArgs "-Dhystrix.command.default.circuitBreaker.enabled=false"
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"

// Uncomment for debugging:
// jvmArgs "-Dhystrix.command.default.execution.timeout.enabled=false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,3 @@ dependencies {

latestDepTestLibrary group: 'com.squareup.okhttp', name: 'okhttp', version: '[2.6,3)'
}


Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* always stores and retrieves them from the agent context, even when accessed from the application.
* All other accesses are to the concrete application context.
*/
public class AgentContextStorage implements ContextStorage {
public class AgentContextStorage implements ContextStorage, AutoCloseable {

private static final Logger logger = LoggerFactory.getLogger(AgentContextStorage.class);

Expand Down Expand Up @@ -145,6 +145,15 @@ public Context current() {
return new AgentContextWrapper(io.opentelemetry.context.Context.current(), applicationContext);
}

@Override
public void close() throws Exception {
io.opentelemetry.context.ContextStorage agentStorage =
io.opentelemetry.context.ContextStorage.get();
if (agentStorage instanceof AutoCloseable) {
((AutoCloseable) agentStorage).close();
}
}

public static class AgentContextWrapper implements Context {
private final io.opentelemetry.context.Context agentContext;
private final Context applicationContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,8 @@ dependencies {
testImplementation group: 'com.sun.activation', name: 'jakarta.activation', version: '1.2.2'
}
}

tasks.withType(Test) {
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2648
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@ public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
Context traceContext = tracer.startSpan(Context.current(), context.getMsgList());
ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent());
context.setMqTraceContext(contextAndScope);
Context otelContext = tracer.startSpan(Context.current(), context.getMsgList());
context.setMqTraceContext(otelContext);
}

@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
contextAndScope.closeScope();
tracer.end(contextAndScope.getContext());
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.end(otelContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
final class TracingSendMessageHookImpl implements SendMessageHook {

private final RocketMqProducerTracer tracer;
private boolean propagationEnabled;
private final boolean propagationEnabled;

TracingSendMessageHookImpl(RocketMqProducerTracer tracer, boolean propagationEnabled) {
this.tracer = tracer;
Expand All @@ -32,27 +32,25 @@ public void sendMessageBefore(SendMessageContext context) {
if (context == null) {
return;
}
Context traceContext =
Context otelContext =
tracer.startProducerSpan(Context.current(), context.getBrokerAddr(), context.getMessage());
if (propagationEnabled) {
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(traceContext, context.getMessage().getProperties(), SETTER);
.inject(otelContext, context.getMessage().getProperties(), SETTER);
}
ContextAndScope contextAndScope = new ContextAndScope(traceContext, traceContext.makeCurrent());
context.setMqTraceContext(contextAndScope);
context.setMqTraceContext(otelContext);
}

@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) {
return;
}
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
tracer.afterProduce(contextAndScope.getContext(), context.getSendResult());
contextAndScope.closeScope();
tracer.end(contextAndScope.getContext());
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.afterProduce(otelContext, context.getSendResult());
tracer.end(otelContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.Callable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.ThreadPoolExecutor
Expand Down Expand Up @@ -87,7 +88,7 @@ class ScalaExecutorInstrumentationTest extends AgentInstrumentationSpecification

def "#poolImpl '#name' reports after canceled jobs"() {
setup:
def pool = poolImpl
ExecutorService pool = poolImpl
def m = method
List<ScalaAsyncChild> children = new ArrayList<>()
List<Future> jobFutures = new ArrayList<>()
Expand Down Expand Up @@ -129,6 +130,11 @@ class ScalaExecutorInstrumentationTest extends AgentInstrumentationSpecification
expect:
waitForTraces(1).size() == 1

// Wait for shutdown to make sure any remaining tasks finish and cleanup context since we don't
// wait on the tasks.
pool.shutdown()
pool.awaitTermination(10, TimeUnit.SECONDS)

where:
name | method | poolImpl
"submit Runnable" | submitRunnable | new ForkJoinPool()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ dependencies {
tasks.withType(Test) {
// TODO run tests both with and without experimental span attributes
jvmArgs '-Dotel.instrumentation.spring-webflux.experimental-span-attributes=true'
// TODO(anuraaga): There is no actual context leak - it just seems that the server-side does not
// fully complete processing before the test cases finish, which is when we check for context
// leaks. Adding Thread.sleep(1000) just before checking for leaks allows it to pass but is not
// a good approach. Come up with a better one and enable this.
jvmArgs "-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false"

systemProperty "testLatestDeps", testLatestDeps
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,43 @@
/** Utils for concurrent instrumentations. */
public class ExecutorInstrumentationUtils {

private static final ClassValue<Boolean> NOT_INSTRUMENTED_RUNNABLE_ENCLOSING_CLASS =
new ClassValue<Boolean>() {
@Override
protected Boolean computeValue(Class<?> enclosingClass) {
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
// process a new request which should not have context from them current request.
if (enclosingClass.getName().equals("org.eclipse.jetty.io.nio.SelectChannelEndPoint")) {
return true;
}

// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down.
if (enclosingClass
.getName()
.equals("io.netty.util.concurrent.SingleThreadEventExecutor")) {
return true;
}

// OkHttp task runner is a lazily-initialized shared pool of continuosly running threads
// similar to an event loop. The submitted tasks themselves should already be instrumented
// to
// allow async propagation.
if (enclosingClass.getName().equals("okhttp3.internal.concurrent.TaskRunner")) {
return true;
}

// OkHttp connection pool lazily initializes a long running task to detect expired
// connections
// and should not itself be instrumented.
if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) {
return true;
}

return false;
}
};

/**
* Checks if given task should get state attached.
*
Expand All @@ -28,22 +65,37 @@ public static boolean shouldAttachStateToTask(Object task) {
Class<?> taskClass = task.getClass();
Class<?> enclosingClass = taskClass.getEnclosingClass();

// not much point in propagating root context
// plus it causes failures under otel.javaagent.testing.fail-on-context-leak=true
return Context.current() != Context.root()
// TODO Workaround for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/787
&& !taskClass.getName().equals("org.apache.tomcat.util.net.NioEndpoint$SocketProcessor")
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
// process a new request which should not have context from them current request.
&& (enclosingClass == null
|| !enclosingClass.getName().equals("org.eclipse.jetty.io.nio.SelectChannelEndPoint"))
// Don't instrument the executor's own runnables. These runnables may never return until
// netty shuts down.
&& (enclosingClass == null
|| !enclosingClass
.getName()
.equals("io.netty.util.concurrent.SingleThreadEventExecutor"));
if (Context.current() == Context.root()) {
// not much point in propagating root context
// plus it causes failures under otel.javaagent.testing.fail-on-context-leak=true
return false;
}

// ForkJoinPool threads are initialized lazily and continue to handle tasks similar to an event
// loop. They should not have context propagated to the base of the thread, tasks themselves
// will have it through other means.
if (taskClass.getName().equals("java.util.concurrent.ForkJoinWorkerThread")) {
return false;
}

// ThreadPoolExecutor worker threads may be initialized lazily and manage interruption of other
// threads. The actual tasks being run on those threads will propagate context but we should not
// propagate onto this management thread.
if (taskClass.getName().equals("java.util.concurrent.ThreadPoolExecutor$Worker")) {
return false;
}

// TODO Workaround for
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/787
if (taskClass.getName().equals("org.apache.tomcat.util.net.NioEndpoint$SocketProcessor")) {
return false;
}

if (enclosingClass != null && NOT_INSTRUMENTED_RUNNABLE_ENCLOSING_CLASS.get(enclosingClass)) {
return false;
}

return true;
}

/**
Expand Down
Loading

0 comments on commit dcd316d

Please sign in to comment.