Skip to content

Commit

Permalink
Akka http: propagate context to callbacks (open-telemetry#3263)
Browse files Browse the repository at this point in the history
Co-authored-by: Trask Stalnaker <[email protected]>
  • Loading branch information
2 people authored and robododge committed Jun 17, 2021
1 parent 633b7dd commit dbb9a97
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.akkahttp.client;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand All @@ -20,6 +20,6 @@ public AkkaHttpClientInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HttpExtClientInstrumentation());
return asList(new HttpExtClientInstrumentation(), new PoolMasterActorInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.akkahttp.client;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.impl.Promise;
import scala.runtime.AbstractFunction1;
import scala.util.Try;

public class FutureWrapper {
public static <T> Future<T> wrap(
Future<T> future, ExecutionContext executionContext, Context context) {
Promise.DefaultPromise<T> promise = new Promise.DefaultPromise<>();
future.onComplete(
new AbstractFunction1<Try<T>, Object>() {

@Override
public Object apply(Try<T> result) {
try (Scope ignored = context.makeCurrent()) {
return promise.complete(result);
}
}
},
executionContext);

return promise;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static void methodEnter(
public static void methodExit(
@Advice.Argument(0) HttpRequest request,
@Advice.This HttpExt thiz,
@Advice.Return Future<HttpResponse> responseFuture,
@Advice.Return(readOnly = false) Future<HttpResponse> responseFuture,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Expand All @@ -84,6 +84,10 @@ public static void methodExit(
} else {
tracer().endExceptionally(context, throwable);
}
if (responseFuture != null) {
responseFuture =
FutureWrapper.wrap(responseFuture, thiz.system().dispatcher(), currentContext());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,18 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<HttpRequest> impl
@Override
void sendRequestWithCallback(HttpRequest request, String method, URI uri, Map<String, String> headers, RequestResult requestResult) {
Http.get(system).singleRequest(request, materializer).whenComplete {response, throwable ->
response.discardEntityBytes(materializer)
if (throwable == null) {
response.discardEntityBytes(materializer)
}
requestResult.complete({ response.status().intValue() }, throwable)
}
}

// TODO(anuraaga): Context leak seems to prevent us from running asynchronous tests in a row.
// Disable for now.
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2639
@Override
boolean testCallback() {
false
}

@Override
boolean testRedirects() {
false
}

@Override
boolean testRemoteConnection() {
// Not sure how to properly set timeouts...
return false
}

@Override
SingleConnection createSingleConnection(String host, int port) {
// singleConnection test would require instrumentation to support requests made through pools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,10 @@ akka.http {
host-connection-pool {
// Limit maximum http backoff for tests
max-connection-backoff = 100ms
max-open-requests = 1024
max-retries = 0
client {
connecting-timeout = 5s
}
}
}

0 comments on commit dbb9a97

Please sign in to comment.