diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java index 4d7fde335727..c8b903f25670 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java @@ -12,7 +12,6 @@ import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.caching.Cache; -import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer; public final class FutureListenerWrappers { // Instead of ContextStore use Cache with weak keys and weak values to store link between original @@ -64,7 +63,6 @@ private WrappedFutureListener(Context context, GenericFutureListener> @Override public void operationComplete(Future future) throws Exception { - AbstractNettyHttpClientTracer.operationComplete(future); try (Scope ignored = context.makeCurrent()) { delegate.operationComplete(future); } @@ -93,7 +91,6 @@ public void operationProgressed(ProgressiveFuture progressiveFuture, long l, @Override public void operationComplete(ProgressiveFuture progressiveFuture) throws Exception { - AbstractNettyHttpClientTracer.operationComplete(progressiveFuture); try (Scope ignored = context.makeCurrent()) { delegate.operationComplete(progressiveFuture); } diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/AbstractNettyHttpClientTracer.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/AbstractNettyHttpClientTracer.java index 7afe6b75a40a..d29b580d6ecf 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/AbstractNettyHttpClientTracer.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/AbstractNettyHttpClientTracer.java @@ -6,25 +6,26 @@ package io.opentelemetry.javaagent.instrumentation.netty.common.client; import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; import static io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyResponseInjectAdapter.SETTER; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_UDP; import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramChannel; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponse; -import io.netty.util.concurrent.Future; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer; import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.net.URISyntaxException; import org.checkerframework.checker.nullness.qual.Nullable; @@ -32,6 +33,10 @@ public abstract class AbstractNettyHttpClientTracer extends HttpClientTracer { + private static final boolean alwaysCreateConnectSpan = + Config.get() + .getBooleanProperty("otel.instrumentation.netty.always-create-connect-span", false); + protected AbstractNettyHttpClientTracer() { super(NetPeerAttributes.INSTANCE); } @@ -84,39 +89,60 @@ protected TextMapSetter getSetter() { return SETTER; } - public static void operationComplete(Future future) { - AbstractNettyHttpClientTracer tracer = NettyHttpClientTracerAccess.getTracer(); - if (tracer == null) { - return; + public Context startConnectionSpan(Context parentContext, SocketAddress remoteAddress) { + if (!alwaysCreateConnectSpan) { + return null; } - if (!(future instanceof ChannelFuture)) { - return; - } - // If first call to GenericFutureListener#operationComplete has an exception then we - // treat it as the cause of connection failure and create a special span for it - ChannelFuture channelFuture = (ChannelFuture) future; - Context parentContext = tracer.getAndRemoveConnectContext(channelFuture); - if (parentContext == null) { - return; - } - Throwable cause = future.cause(); - if (cause == null) { - return; - } + SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", INTERNAL); + NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress); + + return parentContext.with(spanBuilder.startSpan()); + } - if (tracer.shouldStartSpan(parentContext, SpanKind.CLIENT)) { - tracer.connectionFailure(parentContext, channelFuture.channel(), cause); + public void endConnectionSpan( + Context context, + Context parentContext, + SocketAddress remoteAddress, + Channel channel, + Throwable throwable) { + if (alwaysCreateConnectSpan) { + if (context != null) { + // if context is present we started span in startConnectionSpan + endConnectionSpan(context, channel, throwable); + } + } else if (throwable != null && shouldStartSpan(parentContext, CLIENT)) { + // if we didn't start span in startConnectionSpan create a span only when the request fails + // and when not inside a client span + connectionFailure(parentContext, remoteAddress, channel, throwable); } } - protected abstract Context getAndRemoveConnectContext(ChannelFuture channelFuture); + private void endConnectionSpan(Context context, Channel channel, Throwable throwable) { + if (channel != null) { + Span span = Span.fromContext(context); + span.setAttribute( + SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP); + NetPeerAttributes.INSTANCE.setNetPeer(span, (InetSocketAddress) channel.remoteAddress()); + } + if (throwable != null) { + endExceptionally(context, throwable); + } else { + end(context); + } + } - private void connectionFailure(Context parentContext, Channel channel, Throwable throwable) { + private void connectionFailure( + Context parentContext, SocketAddress remoteAddress, Channel channel, Throwable throwable) { SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", CLIENT); - spanBuilder.setAttribute( - SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP); - NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) channel.remoteAddress()); + if (channel != null) { + spanBuilder.setAttribute( + SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP); + NetPeerAttributes.INSTANCE.setNetPeer( + spanBuilder, (InetSocketAddress) channel.remoteAddress()); + } else if (remoteAddress != null) { + NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress); + } Context context = withClientSpan(parentContext, spanBuilder.startSpan()); endExceptionally(context, throwable); diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/ConnectionCompleteListener.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/ConnectionCompleteListener.java new file mode 100644 index 000000000000..270422b731ee --- /dev/null +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/client/ConnectionCompleteListener.java @@ -0,0 +1,36 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.netty.common.client; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.opentelemetry.context.Context; + +public class ConnectionCompleteListener implements GenericFutureListener> { + private final Context context; + private final Context parentContext; + + public ConnectionCompleteListener(Context context, Context parentContext) { + this.context = context; + this.parentContext = parentContext; + } + + @Override + public void operationComplete(Future future) { + AbstractNettyHttpClientTracer tracer = NettyHttpClientTracerAccess.getTracer(); + if (tracer == null) { + return; + } + + Channel channel = null; + if (future instanceof ChannelFuture) { + channel = ((ChannelFuture) future).channel(); + } + tracer.endConnectionSpan(context, parentContext, null, channel, future.cause()); + } +} diff --git a/instrumentation/netty/netty-4.0/javaagent/build.gradle.kts b/instrumentation/netty/netty-4.0/javaagent/build.gradle.kts index d6f96ffc7a7c..64e47a49faba 100644 --- a/instrumentation/netty/netty-4.0/javaagent/build.gradle.kts +++ b/instrumentation/netty/netty-4.0/javaagent/build.gradle.kts @@ -29,6 +29,25 @@ dependencies { latestDepTestLibrary("io.netty:netty-codec-http:4.0.56.Final") } +tasks { + val testConnectionSpan by registering(Test::class) { + filter { + includeTestsMatching("Netty40ConnectionSpanTest") + isFailOnNoMatchingTests = false + } + include("**/Netty40ConnectionSpanTest.*") + jvmArgs("-Dotel.instrumentation.netty.always-create-connect-span=true") + } + + named("test") { + dependsOn(testConnectionSpan) + filter { + excludeTestsMatching("Netty40ConnectionSpanTest") + isFailOnNoMatchingTests = false + } + } +} + // We need to force the dependency to the earliest supported version because other libraries declare newer versions. if (!(findProperty("testLatestDeps") as Boolean)) { configurations.configureEach { diff --git a/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/BootstrapInstrumentation.java b/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/BootstrapInstrumentation.java index 18b84352e4e7..279eb0add460 100644 --- a/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/BootstrapInstrumentation.java +++ b/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/BootstrapInstrumentation.java @@ -8,9 +8,16 @@ import static io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.NettyHttpClientTracer.tracer; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.netty.channel.ChannelFuture; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import io.opentelemetry.javaagent.instrumentation.netty.common.client.ConnectionCompleteListener; +import java.net.SocketAddress; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -25,6 +32,9 @@ public ElementMatcher typeMatcher() { public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( isConstructor(), BootstrapInstrumentation.class.getName() + "$InitAdvice"); + transformer.applyAdviceToMethod( + named("doConnect").and(takesArgument(0, SocketAddress.class)), + BootstrapInstrumentation.class.getName() + "$ConnectAdvice"); } @SuppressWarnings("unused") @@ -37,4 +47,38 @@ public static void enter() { tracer(); } } + + public static class ConnectAdvice { + @Advice.OnMethodEnter + public static void startConnect( + @Advice.Argument(0) SocketAddress remoteAddress, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelParentContext") Context parentContext, + @Advice.Local("otelScope") Scope scope) { + parentContext = Java8BytecodeBridge.currentContext(); + context = tracer().startConnectionSpan(parentContext, remoteAddress); + if (context != null) { + scope = context.makeCurrent(); + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void endConnect( + @Advice.Thrown Throwable throwable, + @Advice.Argument(0) SocketAddress remoteAddress, + @Advice.Return ChannelFuture channelFuture, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelParentContext") Context parentContext, + @Advice.Local("otelScope") Scope scope) { + if (scope != null) { + scope.close(); + } + + if (throwable != null) { + tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable); + } else { + channelFuture.addListener(new ConnectionCompleteListener(context, parentContext)); + } + } + } } diff --git a/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/NettyChannelPipelineInstrumentation.java b/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/NettyChannelPipelineInstrumentation.java index 91b9dcf257f2..88a7149fd4d2 100644 --- a/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/NettyChannelPipelineInstrumentation.java +++ b/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/NettyChannelPipelineInstrumentation.java @@ -8,7 +8,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import io.netty.channel.ChannelHandler; @@ -19,12 +18,9 @@ import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.util.Attribute; -import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.api.CallDepth; import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; -import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import io.opentelemetry.javaagent.instrumentation.netty.common.AbstractNettyChannelPipelineInstrumentation; import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientRequestTracingHandler; import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientResponseTracingHandler; @@ -46,9 +42,6 @@ public void transform(TypeTransformer transformer) { .and(nameStartsWith("add")) .and(takesArgument(2, named("io.netty.channel.ChannelHandler"))), NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineAddAdvice"); - transformer.applyAdviceToMethod( - isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))), - NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineConnectAdvice"); } /** @@ -103,15 +96,4 @@ public static void addHandler( } } } - - @SuppressWarnings("unused") - public static class ChannelPipelineConnectAdvice { - - @Advice.OnMethodEnter - public static void addParentSpan(@Advice.This ChannelPipeline pipeline) { - Context context = Java8BytecodeBridge.currentContext(); - Attribute attribute = pipeline.channel().attr(AttributeKeys.CONNECT_CONTEXT); - attribute.compareAndSet(null, context); - } - } } diff --git a/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/NettyHttpClientTracer.java b/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/NettyHttpClientTracer.java index 89195f9f6362..21cf516e1419 100644 --- a/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/NettyHttpClientTracer.java +++ b/instrumentation/netty/netty-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_0/client/NettyHttpClientTracer.java @@ -5,12 +5,9 @@ package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client; -import io.netty.channel.ChannelFuture; import io.netty.handler.codec.http.HttpResponse; -import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer; import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyHttpClientTracerAccess; -import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys; public class NettyHttpClientTracer extends AbstractNettyHttpClientTracer { private static final NettyHttpClientTracer TRACER = new NettyHttpClientTracer(); @@ -25,11 +22,6 @@ public static NettyHttpClientTracer tracer() { return TRACER; } - @Override - protected Context getAndRemoveConnectContext(ChannelFuture channelFuture) { - return channelFuture.channel().attr(AttributeKeys.CONNECT_CONTEXT).getAndRemove(); - } - @Override protected Integer status(HttpResponse httpResponse) { return httpResponse.getStatus().code(); diff --git a/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ConnectionSpanTest.groovy b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ConnectionSpanTest.groovy new file mode 100644 index 000000000000..0d567a0342f0 --- /dev/null +++ b/instrumentation/netty/netty-4.0/javaagent/src/test/groovy/Netty40ConnectionSpanTest.groovy @@ -0,0 +1,164 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.SERVER +import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelInitializer +import io.netty.channel.ChannelPipeline +import io.netty.channel.EventLoopGroup +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.http.DefaultFullHttpRequest +import io.netty.handler.codec.http.HttpClientCodec +import io.netty.handler.codec.http.HttpHeaders +import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpVersion +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import spock.lang.Shared + +class Netty40ConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { + + @Shared + private HttpClientTestServer server + + @Shared + private EventLoopGroup eventLoopGroup = new NioEventLoopGroup() + + @Shared + private Bootstrap bootstrap = buildBootstrap() + + def setupSpec() { + server = new HttpClientTestServer(openTelemetry) + server.start() + } + + def cleanupSpec() { + eventLoopGroup.shutdownGracefully() + server.stop() + } + + Bootstrap buildBootstrap() { + Bootstrap bootstrap = new Bootstrap() + bootstrap.group(eventLoopGroup) + .channel(NioSocketChannel) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline() + pipeline.addLast(new HttpClientCodec()) + } + }) + + return bootstrap + } + + DefaultFullHttpRequest buildRequest(String method, URI uri, Map headers) { + def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER) + HttpHeaders.setHost(request, uri.host) + headers.each { k, v -> request.headers().set(k, v) } + return request + } + + int sendRequest(DefaultFullHttpRequest request, URI uri) { + def channel = bootstrap.connect(uri.host, uri.port).sync().channel() + def result = new CompletableFuture() + channel.pipeline().addLast(new ClientHandler(result)) + channel.writeAndFlush(request).get() + return result.get(20, TimeUnit.SECONDS) + } + + def "test successful request"() { + when: + def uri = URI.create("http://localhost:${server.httpPort()}/success") + def request = buildRequest("GET", uri, [:]) + def responseCode = runWithSpan("parent") { + sendRequest(request, uri) + } + + then: + responseCode == 200 + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" uri.host + "${SemanticAttributes.NET_PEER_PORT.key}" uri.port + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } + } + span(2) { + name "HTTP GET" + kind CLIENT + childOf(span(0)) + } + span(3) { + name "test-http-server" + kind SERVER + childOf(span(2)) + } + } + } + } + + def "test failing request"() { + when: + URI uri = URI.create("http://localhost:${PortUtils.UNUSABLE_PORT}") + def request = buildRequest("GET", uri, [:]) + runWithSpan("parent") { + sendRequest(request, uri) + } + + then: + def thrownException = thrown(Exception) + + and: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(thrownException.class, thrownException.message) + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + status ERROR + errorEvent(thrownException.class, thrownException.message) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" uri.host + "${SemanticAttributes.NET_PEER_PORT.key}" uri.port + "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + } + } + } + } + } +} diff --git a/instrumentation/netty/netty-4.1/javaagent/build.gradle.kts b/instrumentation/netty/netty-4.1/javaagent/build.gradle.kts index e982b327f070..9f8a161efa8d 100644 --- a/instrumentation/netty/netty-4.1/javaagent/build.gradle.kts +++ b/instrumentation/netty/netty-4.1/javaagent/build.gradle.kts @@ -43,8 +43,23 @@ dependencies { } tasks { + val testConnectionSpan by registering(Test::class) { + filter { + includeTestsMatching("Netty41ConnectionSpanTest") + isFailOnNoMatchingTests = false + } + include("**/Netty41ConnectionSpanTest.*") + jvmArgs("-Dotel.instrumentation.netty.always-create-connect-span=true") + } + named("test") { systemProperty("testLatestDeps", findProperty("testLatestDeps")) + + dependsOn(testConnectionSpan) + filter { + excludeTestsMatching("Netty41ConnectionSpanTest") + isFailOnNoMatchingTests = false + } } } diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/BootstrapInstrumentation.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/BootstrapInstrumentation.java index 69a10e3ed6cb..762e2627f5ae 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/BootstrapInstrumentation.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/BootstrapInstrumentation.java @@ -8,9 +8,16 @@ import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.NettyHttpClientTracer.tracer; import static net.bytebuddy.matcher.ElementMatchers.isConstructor; import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import io.netty.channel.ChannelFuture; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import io.opentelemetry.javaagent.instrumentation.netty.common.client.ConnectionCompleteListener; +import java.net.SocketAddress; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -25,6 +32,9 @@ public ElementMatcher typeMatcher() { public void transform(TypeTransformer transformer) { transformer.applyAdviceToMethod( isConstructor(), BootstrapInstrumentation.class.getName() + "$InitAdvice"); + transformer.applyAdviceToMethod( + named("doResolveAndConnect").and(takesArgument(0, SocketAddress.class)), + BootstrapInstrumentation.class.getName() + "$ConnectAdvice"); } @SuppressWarnings("unused") @@ -37,4 +47,38 @@ public static void enter() { tracer(); } } + + public static class ConnectAdvice { + @Advice.OnMethodEnter + public static void startConnect( + @Advice.Argument(0) SocketAddress remoteAddress, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelParentContext") Context parentContext, + @Advice.Local("otelScope") Scope scope) { + parentContext = Java8BytecodeBridge.currentContext(); + context = tracer().startConnectionSpan(parentContext, remoteAddress); + if (context != null) { + scope = context.makeCurrent(); + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void endConnect( + @Advice.Thrown Throwable throwable, + @Advice.Argument(0) SocketAddress remoteAddress, + @Advice.Return ChannelFuture channelFuture, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelParentContext") Context parentContext, + @Advice.Local("otelScope") Scope scope) { + if (scope != null) { + scope.close(); + } + + if (throwable != null) { + tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable); + } else { + channelFuture.addListener(new ConnectionCompleteListener(context, parentContext)); + } + } + } } diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/NettyChannelPipelineInstrumentation.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/NettyChannelPipelineInstrumentation.java index ee081a529399..83f1d8ee872a 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/NettyChannelPipelineInstrumentation.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/NettyChannelPipelineInstrumentation.java @@ -8,7 +8,6 @@ import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import io.netty.channel.ChannelHandler; @@ -20,13 +19,9 @@ import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.util.Attribute; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; import io.opentelemetry.javaagent.instrumentation.api.CallDepth; import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; -import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import io.opentelemetry.javaagent.instrumentation.netty.common.AbstractNettyChannelPipelineInstrumentation; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientRequestTracingHandler; import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientResponseTracingHandler; @@ -49,9 +44,6 @@ public void transform(TypeTransformer transformer) { .and(takesArgument(1, String.class)) .and(takesArgument(2, named("io.netty.channel.ChannelHandler"))), NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineAddAdvice"); - transformer.applyAdviceToMethod( - isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))), - NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineConnectAdvice"); } /** @@ -129,14 +121,4 @@ public static void addHandler( } } } - - @SuppressWarnings("unused") - public static class ChannelPipelineConnectAdvice { - - @Advice.OnMethodEnter - public static void addParentSpan(@Advice.This ChannelPipeline pipeline) { - Attribute attribute = pipeline.channel().attr(AttributeKeys.CONNECT_CONTEXT); - attribute.compareAndSet(null, Java8BytecodeBridge.currentContext()); - } - } } diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/NettyHttpClientTracer.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/NettyHttpClientTracer.java index 2d2c7708c553..9fbb526c609e 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/NettyHttpClientTracer.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/NettyHttpClientTracer.java @@ -5,11 +5,9 @@ package io.opentelemetry.javaagent.instrumentation.netty.v4_1.client; -import io.netty.channel.ChannelFuture; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys; import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer; import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyHttpClientTracerAccess; @@ -26,11 +24,6 @@ public static NettyHttpClientTracer tracer() { return TRACER; } - @Override - protected Context getAndRemoveConnectContext(ChannelFuture channelFuture) { - return channelFuture.channel().attr(AttributeKeys.CONNECT_CONTEXT).getAndRemove(); - } - @Override protected Integer status(HttpResponse httpResponse) { return httpResponse.status().code(); diff --git a/instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ConnectionSpanTest.groovy b/instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ConnectionSpanTest.groovy new file mode 100644 index 000000000000..fb52f24c4b1c --- /dev/null +++ b/instrumentation/netty/netty-4.1/javaagent/src/test/groovy/Netty41ConnectionSpanTest.groovy @@ -0,0 +1,164 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.SERVER +import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP + +import io.netty.bootstrap.Bootstrap +import io.netty.buffer.Unpooled +import io.netty.channel.ChannelInitializer +import io.netty.channel.ChannelPipeline +import io.netty.channel.EventLoopGroup +import io.netty.channel.nio.NioEventLoopGroup +import io.netty.channel.socket.SocketChannel +import io.netty.channel.socket.nio.NioSocketChannel +import io.netty.handler.codec.http.DefaultFullHttpRequest +import io.netty.handler.codec.http.HttpClientCodec +import io.netty.handler.codec.http.HttpHeaderNames +import io.netty.handler.codec.http.HttpMethod +import io.netty.handler.codec.http.HttpVersion +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import spock.lang.Shared + +class Netty41ConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { + + @Shared + private HttpClientTestServer server + + @Shared + private EventLoopGroup eventLoopGroup = new NioEventLoopGroup() + + @Shared + private Bootstrap bootstrap = buildBootstrap() + + def setupSpec() { + server = new HttpClientTestServer(openTelemetry) + server.start() + } + + def cleanupSpec() { + eventLoopGroup.shutdownGracefully() + server.stop() + } + + Bootstrap buildBootstrap() { + Bootstrap bootstrap = new Bootstrap() + bootstrap.group(eventLoopGroup) + .channel(NioSocketChannel) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + ChannelPipeline pipeline = socketChannel.pipeline() + pipeline.addLast(new HttpClientCodec()) + } + }) + + return bootstrap + } + + DefaultFullHttpRequest buildRequest(String method, URI uri, Map headers) { + def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER) + request.headers().set(HttpHeaderNames.HOST, uri.host) + headers.each { k, v -> request.headers().set(k, v) } + return request + } + + int sendRequest(DefaultFullHttpRequest request, URI uri) { + def channel = bootstrap.connect(uri.host, uri.port).sync().channel() + def result = new CompletableFuture() + channel.pipeline().addLast(new ClientHandler(result)) + channel.writeAndFlush(request).get() + return result.get(20, TimeUnit.SECONDS) + } + + def "test successful request"() { + when: + def uri = URI.create("http://localhost:${server.httpPort()}/success") + def request = buildRequest("GET", uri, [:]) + def responseCode = runWithSpan("parent") { + sendRequest(request, uri) + } + + then: + responseCode == 200 + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" uri.host + "${SemanticAttributes.NET_PEER_PORT.key}" uri.port + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } + } + span(2) { + name "HTTP GET" + kind CLIENT + childOf(span(0)) + } + span(3) { + name "test-http-server" + kind SERVER + childOf(span(2)) + } + } + } + } + + def "test failing request"() { + when: + URI uri = URI.create("http://localhost:${PortUtils.UNUSABLE_PORT}") + def request = buildRequest("GET", uri, [:]) + runWithSpan("parent") { + sendRequest(request, uri) + } + + then: + def thrownException = thrown(Exception) + + and: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(thrownException.class, thrownException.message) + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + status ERROR + errorEvent(thrownException.class, thrownException.message) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" uri.host + "${SemanticAttributes.NET_PEER_PORT.key}" uri.port + "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + } + } + } + } + } +} diff --git a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AttributeKeys.java b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AttributeKeys.java index 41e98378bf74..83b3bda6ac29 100644 --- a/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AttributeKeys.java +++ b/instrumentation/netty/netty-4.1/library/src/main/java/io/opentelemetry/instrumentation/netty/v4_1/AttributeKeys.java @@ -10,8 +10,6 @@ public final class AttributeKeys { - public static final AttributeKey CONNECT_CONTEXT = - AttributeKey.valueOf(AttributeKeys.class, "connect-context"); public static final AttributeKey WRITE_CONTEXT = AttributeKey.valueOf(AttributeKeys.class, "passed-context"); diff --git a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/build.gradle.kts b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/build.gradle.kts index c451050b0f70..4d2cf5c8f417 100644 --- a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/build.gradle.kts +++ b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/build.gradle.kts @@ -25,3 +25,22 @@ dependencies { latestDepTestLibrary("io.projectreactor.netty:reactor-netty:(,1.0.0)") } + +tasks { + val testConnectionSpan by registering(Test::class) { + filter { + includeTestsMatching("ReactorNettyConnectionSpanTest") + isFailOnNoMatchingTests = false + } + include("**/ReactorNettyConnectionSpanTest.*") + jvmArgs("-Dotel.instrumentation.netty.always-create-connect-span=true") + } + + named("test") { + dependsOn(testConnectionSpan) + filter { + excludeTestsMatching("ReactorNettyConnectionSpanTest") + isFailOnNoMatchingTests = false + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.groovy b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.groovy new file mode 100644 index 000000000000..ce90b09f3600 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-0.9/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v0_9/ReactorNettyConnectionSpanTest.groovy @@ -0,0 +1,127 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v0_9 + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.SERVER +import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP + +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import reactor.netty.http.client.HttpClient +import spock.lang.Shared + +class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { + + @Shared + private HttpClientTestServer server + + def setupSpec() { + server = new HttpClientTestServer(openTelemetry) + server.start() + } + + def cleanupSpec() { + server.stop() + } + + def "test successful request"() { + when: + def httpClient = HttpClient.create() + def responseCode = + runWithSpan("parent") { + httpClient.get().uri("http://localhost:${server.httpPort()}/success") + .responseSingle { resp, content -> + // Make sure to consume content since that's when we close the span. + content.map { resp } + } + .block() + .status().code() + } + + then: + responseCode == 200 + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } + } + span(2) { + name "HTTP GET" + kind CLIENT + childOf(span(0)) + } + span(3) { + name "test-http-server" + kind SERVER + childOf(span(2)) + } + } + } + } + + def "test failing request"() { + when: + def httpClient = HttpClient.create() + runWithSpan("parent") { + httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}") + .responseSingle { resp, content -> + // Make sure to consume content since that's when we close the span. + content.map { resp } + } + .block() + .status().code() + } + + then: + def thrownException = thrown(Exception) + def connectException = thrownException.getCause() + + and: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(thrownException.class, thrownException.message) + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + status ERROR + errorEvent(connectException.class, connectException.message) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT + "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + } + } + } + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts index 0e4b1560ca63..ba81a8ed7126 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts @@ -24,3 +24,22 @@ dependencies { testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) testInstrumentation(project(":instrumentation:reactor-3.1:javaagent")) } + +tasks { + val testConnectionSpan by registering(Test::class) { + filter { + includeTestsMatching("ReactorNettyConnectionSpanTest") + isFailOnNoMatchingTests = false + } + include("**/ReactorNettyConnectionSpanTest.*") + jvmArgs("-Dotel.instrumentation.reactor-netty.always-create-connect-span=true") + } + + named("test") { + dependsOn(testConnectionSpan) + filter { + excludeTestsMatching("ReactorNettyConnectionSpanTest") + isFailOnNoMatchingTests = false + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ConnectionWrapper.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ConnectionWrapper.java new file mode 100644 index 000000000000..cc72a25405cf --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ConnectionWrapper.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettyTracer.tracer; + +import io.netty.channel.Channel; +import io.opentelemetry.context.Context; +import java.net.SocketAddress; +import reactor.core.publisher.Mono; + +public class ConnectionWrapper { + + public static Mono wrap( + Context context, Context parentContext, SocketAddress remoteAddress, Mono mono) { + return mono.doOnError( + throwable -> { + tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable); + }) + .doOnSuccess( + channel -> { + if (context != null) { + tracer().endConnectionSpan(context, parentContext, remoteAddress, channel, null); + } + }); + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java index ed11d509ccfc..8ede89713c1c 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; @@ -38,6 +38,6 @@ public ElementMatcher.Junction classLoaderMatcher() { @Override public List typeInstrumentations() { - return singletonList(new HttpClientInstrumentation()); + return asList(new HttpClientInstrumentation(), new TransportConnectorInstrumentation()); } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyTracer.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyTracer.java new file mode 100644 index 000000000000..99914336e103 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyTracer.java @@ -0,0 +1,105 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.api.trace.SpanKind.CLIENT; +import static io.opentelemetry.api.trace.SpanKind.INTERNAL; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_UDP; + +import io.netty.channel.Channel; +import io.netty.channel.socket.DatagramChannel; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +public class ReactorNettyTracer extends BaseTracer { + private static final ReactorNettyTracer TRACER = new ReactorNettyTracer(); + + private static final boolean alwaysCreateConnectSpan = + Config.get() + .getBooleanProperty( + "otel.instrumentation.reactor-netty.always-create-connect-span", false); + + protected ReactorNettyTracer() { + super(GlobalOpenTelemetry.get()); + } + + public static ReactorNettyTracer tracer() { + return TRACER; + } + + public Context startConnectionSpan(Context parentContext, SocketAddress remoteAddress) { + if (!alwaysCreateConnectSpan) { + return null; + } + + SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", INTERNAL); + NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress); + + return parentContext.with(spanBuilder.startSpan()); + } + + public void endConnectionSpan( + Context context, + Context parentContext, + SocketAddress remoteAddress, + Channel channel, + Throwable throwable) { + if (alwaysCreateConnectSpan) { + if (context != null) { + // if context is present we started span in startConnectionSpan + endConnectionSpan(context, channel, throwable); + } + } else if (throwable != null && shouldStartSpan(parentContext, CLIENT)) { + // if we didn't start span in startConnectionSpan create a span only when the request fails + // and when not inside a client span + connectionFailure(parentContext, remoteAddress, channel, throwable); + } + } + + private void endConnectionSpan(Context context, Channel channel, Throwable throwable) { + if (channel != null) { + Span span = Span.fromContext(context); + span.setAttribute( + SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP); + NetPeerAttributes.INSTANCE.setNetPeer(span, (InetSocketAddress) channel.remoteAddress()); + } + if (throwable != null) { + endExceptionally(context, throwable); + } else { + end(context); + } + } + + private void connectionFailure( + Context parentContext, SocketAddress remoteAddress, Channel channel, Throwable throwable) { + SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", CLIENT); + if (channel != null) { + spanBuilder.setAttribute( + SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP); + NetPeerAttributes.INSTANCE.setNetPeer( + spanBuilder, (InetSocketAddress) channel.remoteAddress()); + } else if (remoteAddress != null) { + NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) remoteAddress); + } + + Context context = withClientSpan(parentContext, spanBuilder.startSpan()); + endExceptionally(context, throwable); + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.reactor-netty-1.0"; + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java new file mode 100644 index 000000000000..139812c80c8c --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/TransportConnectorInstrumentation.java @@ -0,0 +1,71 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettyTracer.tracer; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.netty.channel.Channel; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; +import java.net.SocketAddress; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.publisher.Mono; + +public class TransportConnectorInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("reactor.netty.transport.TransportConnector"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("connect").and(takesArgument(1, named("java.net.SocketAddress"))), + TransportConnectorInstrumentation.class.getName() + "$ConnectAdvice"); + } + + public static class ConnectAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void startConnect( + @Advice.Argument(1) SocketAddress remoteAddress, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelParentContext") Context parentContext, + @Advice.Local("otelScope") Scope scope) { + parentContext = Java8BytecodeBridge.currentContext(); + context = tracer().startConnectionSpan(parentContext, remoteAddress); + if (context != null) { + scope = context.makeCurrent(); + } + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void endConnect( + @Advice.Thrown Throwable throwable, + @Advice.Argument(1) SocketAddress remoteAddress, + @Advice.Return(readOnly = false) Mono mono, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelParentContext") Context parentContext, + @Advice.Local("otelScope") Scope scope) { + if (scope != null) { + scope.close(); + } + + if (throwable != null) { + tracer().endConnectionSpan(context, parentContext, remoteAddress, null, throwable); + } else { + mono = ConnectionWrapper.wrap(context, parentContext, remoteAddress, mono); + } + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy new file mode 100644 index 000000000000..d23b1397ebb7 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy @@ -0,0 +1,126 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 + +import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL +import static io.opentelemetry.api.trace.SpanKind.SERVER +import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP + +import io.opentelemetry.instrumentation.test.AgentTestTrait +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestServer +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import reactor.netty.http.client.HttpClient +import spock.lang.Shared + +class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { + + @Shared + private HttpClientTestServer server + + def setupSpec() { + server = new HttpClientTestServer(openTelemetry) + server.start() + } + + def cleanupSpec() { + server.stop() + } + + def "test successful request"() { + when: + def httpClient = HttpClient.create() + def responseCode = + runWithSpan("parent") { + httpClient.get().uri("http://localhost:${server.httpPort()}/success") + .responseSingle { resp, content -> + // Make sure to consume content since that's when we close the span. + content.map { resp } + } + .block() + .status().code() + } + + then: + responseCode == 200 + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } + } + span(2) { + name "HTTP GET" + kind CLIENT + childOf(span(0)) + } + span(3) { + name "test-http-server" + kind SERVER + childOf(span(2)) + } + } + } + } + + def "test failing request"() { + when: + def httpClient = HttpClient.create() + runWithSpan("parent") { + httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}") + .responseSingle { resp, content -> + // Make sure to consume content since that's when we close the span. + content.map { resp } + } + .block() + .status().code() + } + + then: + def thrownException = thrown(Exception) + def connectException = thrownException.getCause() + + and: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + status ERROR + errorEvent(thrownException.class, thrownException.message) + } + span(1) { + name "CONNECT" + kind INTERNAL + childOf(span(0)) + status ERROR + errorEvent(connectException.class, connectException.message) + attributes { + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT + "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + } + } + } + } + } +}