Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public abstract class AbstractAsyncHttpClientInstrumentation extends ElasticApmI
public static final List<Class<? extends ElasticApmInstrumentation>> ASYNC_HANDLER_INSTRUMENTATIONS = Arrays.<Class<? extends ElasticApmInstrumentation>>asList(
AsyncHandlerOnCompletedInstrumentation.class,
AsyncHandlerOnThrowableInstrumentation.class,
AsyncHandlerOnStatusReceivedInstrumentation.class);
AsyncHandlerOnStatusReceivedInstrumentation.class,
StreamedAsyncHandlerOnStreamInstrumentation.class);

public AbstractAsyncHttpClientInstrumentation() {
if (headerSetterManager == null) {
Expand Down Expand Up @@ -186,7 +187,7 @@ private static void onMethodEnter(@Advice.This AsyncHandler<?> asyncHandler, @Ad
}

@Advice.OnMethodExit(suppress = Throwable.class)
private static void onMethodExit(@Advice.This AsyncHandler<?> asyncHandler, @Nullable @Advice.Local("span") Span span) {
private static void onMethodExit(@Nullable @Advice.Local("span") Span span) {
if (span != null) {
span.end();
span.deactivate();
Expand All @@ -209,7 +210,7 @@ private static void onMethodEnter(@Advice.This AsyncHandler<?> asyncHandler, @Ad
}

@Advice.OnMethodExit(suppress = Throwable.class)
private static void onMethodExit(@Advice.This AsyncHandler<?> asyncHandler, @Nullable @Advice.Local("span") Span span, @Advice.Argument(0) Throwable t) {
private static void onMethodExit(@Nullable @Advice.Local("span") Span span, @Advice.Argument(0) Throwable t) {
if (span != null) {
span.captureException(t).end();
span.deactivate();
Expand All @@ -232,12 +233,35 @@ private static void onMethodEnter(@Advice.This AsyncHandler<?> asyncHandler, @Ad
}

@Advice.OnMethodExit(suppress = Throwable.class)
private static void onMethodExit(@Advice.This AsyncHandler<?> asyncHandler, @Nullable @Advice.Local("span") Span span, @Advice.Argument(0) HttpResponseStatus status) {
private static void onMethodExit(@Nullable @Advice.Local("span") Span span, @Advice.Argument(0) HttpResponseStatus status) {
if (span != null) {
span.getContext().getHttp().withStatusCode(status.getStatusCode());
span.deactivate();
}
}
}

public static class StreamedAsyncHandlerOnStreamInstrumentation extends AbstractAsyncHandlerInstrumentation {

public StreamedAsyncHandlerOnStreamInstrumentation() {
super(named("onStream").and(takesArgument(0, named("org.reactivestreams.Publisher"))));
}

@Advice.OnMethodEnter(suppress = Throwable.class)
private static void onMethodEnter(@Advice.This AsyncHandler<?> asyncHandler, @Advice.Local("span") Span span) {
span = handlerSpanMap.get(asyncHandler);
if (span != null) {
span.activate();
}
}

@Advice.OnMethodExit(suppress = Throwable.class)
private static void onMethodExit(@Nullable @Advice.Local("span") Span span) {
if (span != null) {
span.deactivate();
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@

import co.elastic.apm.agent.httpclient.AbstractHttpClientInstrumentationTest;
import org.asynchttpclient.*;
import org.asynchttpclient.handler.StreamedAsyncHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.reactivestreams.Publisher;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -70,12 +72,32 @@ public Response onCompleted(Response response) {

};

public static class CustomStreamedAsyncHandler extends AsyncCompletionHandler<Response> implements StreamedAsyncHandler<Response> {

@Override
public Response onCompleted(Response response) {
assertThat(tracer.getActive()).isNotNull();
assertThat(tracer.getActive().isExit()).isTrue();
return response;
}

@Override
public State onStream(Publisher<HttpResponseBodyPart> publisher) {
assertThat(tracer.getActive()).isNotNull();
assertThat(tracer.getActive().isExit()).isTrue();
return State.ABORT;
}
}

public static AsyncHandler<Response> customStreamAsyncHandler = new CustomStreamedAsyncHandler();

@Parameterized.Parameters()
public static Iterable<RequestExecutor> data() {
return Arrays.asList(
(client, path) -> client.executeRequest(new RequestBuilder().setUrl(path).build()).get(),
(client, path) -> client.executeRequest(new RequestBuilder().setUrl(path).build(), new AsyncCompletionHandlerBase()).get(),
(client, path) -> client.executeRequest(new RequestBuilder().setUrl(path).build(), customAsyncHandler).get(),
(client, path) -> client.executeRequest(new RequestBuilder().setUrl(path).build(), customStreamAsyncHandler).get(),
(client, path) -> client.prepareGet(path).execute(new AsyncCompletionHandlerBase()).get(),
(client, path) -> client.prepareGet(path).execute().get()
);
Expand Down