Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Netty connection failure handling when listener is lambda #3569

Merged
merged 3 commits into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.caching.Cache;
import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer;

public final class FutureListenerWrappers {
// Instead of ContextStore use Cache with weak keys and weak values to store link between original
Expand Down Expand Up @@ -63,6 +64,7 @@ private WrappedFutureListener(Context context, GenericFutureListener<Future<?>>

@Override
public void operationComplete(Future<?> future) throws Exception {
AbstractNettyHttpClientTracer.operationComplete(future);
try (Scope ignored = context.makeCurrent()) {
delegate.operationComplete(future);
}
Expand Down Expand Up @@ -91,6 +93,7 @@ public void operationProgressed(ProgressiveFuture<?> progressiveFuture, long l,

@Override
public void operationComplete(ProgressiveFuture<?> progressiveFuture) throws Exception {
AbstractNettyHttpClientTracer.operationComplete(progressiveFuture);
try (Scope ignored = context.makeCurrent()) {
delegate.operationComplete(progressiveFuture);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.netty.common.client;

import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_UDP;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.DatagramChannel;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;

public abstract class AbstractNettyHttpClientTracer<REQUEST>
extends HttpClientTracer<REQUEST, HttpHeaders, HttpResponse> {

protected AbstractNettyHttpClientTracer() {
super(NetPeerAttributes.INSTANCE);
}

public static void operationComplete(Future<?> future) {
AbstractNettyHttpClientTracer tracer = NettyHttpClientTracerAccess.getTracer();
if (tracer == null) {
return;
}

if (!(future instanceof ChannelFuture)) {
return;
}
// If first call to GenericFutureListener#operationComplete has an exception then we
// treat it as the cause of connection failure and create a special span for it
ChannelFuture channelFuture = (ChannelFuture) future;
AttributeKey<Context> ak = null;
laurit marked this conversation as resolved.
Show resolved Hide resolved
Context parentContext = tracer.getAndRemoveConnectContext(channelFuture);
if (parentContext == null) {
return;
}
Throwable cause = future.cause();
if (cause == null) {
return;
}

if (tracer.shouldStartSpan(parentContext, SpanKind.CLIENT)) {
tracer.connectionFailure(parentContext, channelFuture.channel(), cause);
}
}

protected abstract Context getAndRemoveConnectContext(ChannelFuture channelFuture);

private void connectionFailure(Context parentContext, Channel channel, Throwable throwable) {
SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", CLIENT);
spanBuilder.setAttribute(
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) channel.remoteAddress());

Context context = withClientSpan(parentContext, spanBuilder.startSpan());
endExceptionally(context, throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.netty.common.client;

import java.util.concurrent.atomic.AtomicReference;

public class NettyHttpClientTracerAccess {
private static final AtomicReference<AbstractNettyHttpClientTracer<?>>
nettyHttpClientTracerReference = new AtomicReference<>();

public static AbstractNettyHttpClientTracer<?> getTracer() {
return nettyHttpClientTracerReference.get();
}

public static void setTracer(AbstractNettyHttpClientTracer<?> tracer) {
nettyHttpClientTracerReference.compareAndSet(null, tracer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.netty.v4_0;

import static io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.NettyHttpClientTracer.tracer;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class BootstrapInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.netty.bootstrap.Bootstrap");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), BootstrapInstrumentation.class.getName() + "$InitAdvice");
}

@SuppressWarnings("unused")
public static class InitAdvice {
@Advice.OnMethodEnter
public static void enter() {
// Ensure that tracer is initialized. Connection failure handling is initialized in the static
// initializer of tracer which needs to be run before an attempt is made to establish
// connection.
tracer();
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new BootstrapInstrumentation(),
new ChannelInstrumentation(),
new NettyFutureInstrumentation(),
new ChannelFutureListenerInstrumentation(),
new NettyChannelPipelineInstrumentation(),
new AbstractChannelHandlerContextInstrumentation());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,32 @@
import static io.netty.handler.codec.http.HttpHeaders.Names.HOST;
import static io.opentelemetry.api.trace.SpanKind.CLIENT;
import static io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyResponseInjectAdapter.SETTER;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_UDP;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramChannel;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.tracer.HttpClientTracer;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.opentelemetry.javaagent.instrumentation.netty.common.client.AbstractNettyHttpClientTracer;
import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyHttpClientTracerAccess;
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import org.checkerframework.checker.nullness.qual.Nullable;

public class NettyHttpClientTracer
extends HttpClientTracer<NettyRequestWrapper, HttpHeaders, HttpResponse> {
public class NettyHttpClientTracer extends AbstractNettyHttpClientTracer<NettyRequestWrapper> {
private static final NettyHttpClientTracer TRACER = new NettyHttpClientTracer();

private NettyHttpClientTracer() {
super(NetPeerAttributes.INSTANCE);
static {
NettyHttpClientTracerAccess.setTracer(TRACER);
}

private NettyHttpClientTracer() {}

public static NettyHttpClientTracer tracer() {
return TRACER;
}
Expand All @@ -51,14 +50,9 @@ public Context startSpan(
return context;
}

public void connectionFailure(Context parentContext, Channel channel, Throwable throwable) {
SpanBuilder spanBuilder = spanBuilder(parentContext, "CONNECT", CLIENT);
spanBuilder.setAttribute(
SemanticAttributes.NET_TRANSPORT, channel instanceof DatagramChannel ? IP_UDP : IP_TCP);
NetPeerAttributes.INSTANCE.setNetPeer(spanBuilder, (InetSocketAddress) channel.remoteAddress());

Context context = withClientSpan(parentContext, spanBuilder.startSpan());
tracer().endExceptionally(context, throwable);
@Override
protected Context getAndRemoveConnectContext(ChannelFuture channelFuture) {
return channelFuture.channel().attr(AttributeKeys.CONNECT_CONTEXT).getAndRemove();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.netty.v4_1;

import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.NettyHttpClientTracer.tracer;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class BootstrapInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.netty.bootstrap.Bootstrap");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), BootstrapInstrumentation.class.getName() + "$InitAdvice");
}

@SuppressWarnings("unused")
public static class InitAdvice {
@Advice.OnMethodEnter
public static void enter() {
// Ensure that tracer is initialized. Connection failure handling is initialized in the static
// initializer of tracer which needs to be run before an attempt is made to establish
// connection.
tracer();
}
}
}
Loading