Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka propagation fix and concurrency tests #3099

Merged
merged 3 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public final class ContextPropagationDebug {

// locations where the context was propagated to another thread (tracking multiple steps is
// helpful in akka where there is so much recursive async spawning of new work)
private static final ContextKey<List<StackTraceElement[]>> THREAD_PROPAGATION_LOCATIONS =
private static final ContextKey<List<Propagation>> THREAD_PROPAGATION_LOCATIONS =
ContextKey.named("thread-propagation-locations");

private static final boolean THREAD_PROPAGATION_DEBUGGER =
Expand All @@ -36,13 +36,14 @@ public static boolean isThreadPropagationDebuggerEnabled() {
return THREAD_PROPAGATION_DEBUGGER;
}

public static Context appendLocations(Context context, StackTraceElement[] locations) {
List<StackTraceElement[]> currentLocations = ContextPropagationDebug.getLocations(context);
public static Context appendLocations(
Context context, StackTraceElement[] locations, Object carrier) {
List<Propagation> currentLocations = ContextPropagationDebug.getPropagations(context);
if (currentLocations == null) {
currentLocations = new CopyOnWriteArrayList<>();
context = context.with(THREAD_PROPAGATION_LOCATIONS, currentLocations);
}
currentLocations.add(0, locations);
currentLocations.add(0, new Propagation(carrier.getClass().getName(), locations));
return context;
}

Expand All @@ -67,18 +68,20 @@ public static void debugContextLeakIfEnabled() {
}
}

private static List<StackTraceElement[]> getLocations(Context context) {
private static List<Propagation> getPropagations(Context context) {
return context.get(THREAD_PROPAGATION_LOCATIONS);
}

private static void debugContextPropagation(Context context) {
List<StackTraceElement[]> locations = getLocations(context);
if (locations != null) {
List<Propagation> propagations = getPropagations(context);
if (propagations != null) {
StringBuilder sb = new StringBuilder();
Iterator<StackTraceElement[]> i = locations.iterator();
Iterator<Propagation> i = propagations.iterator();
while (i.hasNext()) {
for (StackTraceElement ste : i.next()) {
sb.append("\n");
Propagation entry = i.next();
sb.append("\ncarrier of type: ").append(entry.carrierClassName);
for (StackTraceElement ste : entry.location) {
Comment on lines +81 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! 👍

sb.append("\n ");
sb.append(ste);
}
if (i.hasNext()) {
Expand All @@ -89,5 +92,15 @@ private static void debugContextPropagation(Context context) {
}
}

private static class Propagation {
public final String carrierClassName;
public final StackTraceElement[] location;

public Propagation(String carrierClassName, StackTraceElement[] location) {
this.carrierClassName = carrierClassName;
this.location = location;
}
}

private ContextPropagationDebug() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.http.javadsl.model.headers.RawHeader
import akka.stream.ActorMaterializer
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.test.base.SingleConnection
import spock.lang.Shared

class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> implements AgentTestTrait {
Expand Down Expand Up @@ -67,8 +68,10 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> impl
}

@Override
boolean testCausality() {
false
SingleConnection createSingleConnection(String host, int port) {
// singleConnection test would require instrumentation to support requests made through pools
// (newHostConnectionPool, superPool, etc), which is currently not supported.
return null
}

def "singleRequest exception trace"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest<Object>
String expectedServerSpanName(ServerEndpoint endpoint) {
return "akka.request"
}

@Override
boolean testConcurrency() {
return true
}
}

class AkkaHttpServerInstrumentationTestSync extends AkkaHttpServerInstrumentationTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ object AkkaHttpTestAsyncWebServer {
def doCall(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus) //.withHeaders(headers.Type)resp.contentType = "text/plain"
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ object AkkaHttpTestSyncWebServer {
def doCall(): HttpResponse = {
val resp = HttpResponse(status = endpoint.getStatus)
endpoint match {
case SUCCESS => resp.withEntity(endpoint.getBody)
case SUCCESS => resp.withEntity(endpoint.getBody)
case INDEXED_CHILD =>
INDEXED_CHILD.collectSpanAttributes(new UrlParameterProvider {
override def getParameter(name: String): String =
uri.query().get(name).orNull
})
resp.withEntity("")
case QUERY_PARAM => resp.withEntity(uri.queryString().orNull)
case REDIRECT =>
resp.withHeaders(headers.Location(endpoint.getBody))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ protected Boolean computeValue(Class<?> taskClass) {
return false;
}

// This is a Mailbox created by akka.dispatch.Dispatcher#createMailbox. We must not add
// a context to it as context should only be carried by individual envelopes in the queue
// of this mailbox.
if (taskClass.getName().equals("akka.dispatch.Dispatcher$$anon$1")) {
return false;
}

Class<?> enclosingClass = taskClass.getEnclosingClass();
if (enclosingClass != null) {
// Avoid context leak on jetty. Runnable submitted from SelectChannelEndPoint is used to
Expand Down Expand Up @@ -144,7 +151,8 @@ public static boolean shouldAttachStateToTask(Object task) {
public static <T> State setupState(ContextStore<T, State> contextStore, T task, Context context) {
State state = contextStore.putIfAbsent(task, State.FACTORY);
if (ContextPropagationDebug.isThreadPropagationDebuggerEnabled()) {
context = ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace());
context =
ContextPropagationDebug.appendLocations(context, new Exception().getStackTrace(), task);
}
state.setParentContext(context);
return state;
Expand Down