Skip to content

Commit

Permalink
Netty4.1: remove our handler when original handler is removed (#3026)
Browse files Browse the repository at this point in the history
* Netty4.1: remove our handler when orignal handler is removed

* Update instrumentation/netty/netty-4.1/javaagent/src/test/groovy/ChannelPipelineTest.groovy

Co-authored-by: Mateusz Rzeszutek <[email protected]>

* disable epoll to see whether it makes any difference

* fix netty with epoll/kqueue native library

Co-authored-by: Mateusz Rzeszutek <[email protected]>
  • Loading branch information
laurit and Mateusz Rzeszutek authored May 19, 2021
1 parent 51b2f31 commit 23a40d9
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.extension.matcher.ClassLoaderMatcher.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -39,7 +38,7 @@ public ElementMatcher<TypeDescription> typeMatcher() {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(nameStartsWith("write")),
isMethod().and(named("write").or(named("writeAndFlush"))),
ChannelInstrumentation.class.getName() + "$AttachContextAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ dependencies {

//Contains logging handler
testLibrary "io.netty:netty-handler:4.1.0.Final"
testLibrary "io.netty:netty-transport-native-epoll:4.1.0.Final:linux-x86_64"

// first version with kqueue, add it only as a compile time dependency
testCompileOnly "io.netty:netty-transport-native-kqueue:4.1.11.Final:osx-x86_64"

latestDepTestLibrary "io.netty:netty-codec-http:(,5.0)"
latestDepTestLibrary "io.netty:netty-handler:(,5.0)"
latestDepTestLibrary "io.netty:netty-transport-native-epoll:(,5.0):linux-x86_64"
latestDepTestLibrary "io.netty:netty-transport-native-kqueue:(,5.0):osx-x86_64"
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.extension.matcher.ClassLoaderMatcher.hasClassesNamed;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -40,7 +39,7 @@ public ElementMatcher<TypeDescription> typeMatcher() {
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(nameStartsWith("write")),
isMethod().and(named("write").or(named("writeAndFlush"))),
ChannelInstrumentation.class.getName() + "$AttachContextAdvice");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientRequestTracingHandler;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientResponseTracingHandler;
Expand Down Expand Up @@ -59,15 +61,27 @@ public void transform(TypeTransformer transformer) {
.and(takesArgument(1, String.class))
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineAddAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(named("remove"))
.and(takesArgument(0, named("io.netty.channel.ChannelHandler"))),
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineRemoveAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("remove")).and(takesArgument(0, String.class)),
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineRemoveByNameAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("remove")).and(takesArgument(0, Class.class)),
NettyChannelPipelineInstrumentation.class.getName()
+ "$ChannelPipelineRemoveByClassAdvice");
transformer.applyAdviceToMethod(
isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))),
NettyChannelPipelineInstrumentation.class.getName() + "$ChannelPipelineConnectAdvice");
}

/**
* When certain handlers are added to the pipeline, we want to add our corresponding tracing
* handlers. If those handlers are later removed, we may want to remove our handlers. That is not
* currently implemented.
* handlers. If those handlers are later removed, we also remove our handlers. Support for
* replacing handlers and removeFirst/removeLast is currently not implemented.
*/
public static class ChannelPipelineAddAdvice {
@Advice.OnMethodEnter
Expand Down Expand Up @@ -106,39 +120,84 @@ public static void addHandler(
name = context.name();
}

try {
// Server pipeline handlers
if (handler instanceof HttpServerCodec) {
pipeline.addAfter(
name, HttpServerTracingHandler.class.getName(), new HttpServerTracingHandler());
} else if (handler instanceof HttpRequestDecoder) {
pipeline.addAfter(
name,
HttpServerRequestTracingHandler.class.getName(),
new HttpServerRequestTracingHandler());
} else if (handler instanceof HttpResponseEncoder) {
pipeline.addAfter(
name,
HttpServerResponseTracingHandler.class.getName(),
new HttpServerResponseTracingHandler());
} else
ChannelHandler ourHandler = null;
// Server pipeline handlers
if (handler instanceof HttpServerCodec) {
ourHandler = new HttpServerTracingHandler();
} else if (handler instanceof HttpRequestDecoder) {
ourHandler = new HttpServerRequestTracingHandler();
} else if (handler instanceof HttpResponseEncoder) {
ourHandler = new HttpServerResponseTracingHandler();
// Client pipeline handlers
if (handler instanceof HttpClientCodec) {
pipeline.addAfter(
name, HttpClientTracingHandler.class.getName(), new HttpClientTracingHandler());
} else if (handler instanceof HttpRequestEncoder) {
pipeline.addAfter(
name,
HttpClientRequestTracingHandler.class.getName(),
new HttpClientRequestTracingHandler());
} else if (handler instanceof HttpResponseDecoder) {
pipeline.addAfter(
name,
HttpClientResponseTracingHandler.class.getName(),
new HttpClientResponseTracingHandler());
} else if (handler instanceof HttpClientCodec) {
ourHandler = new HttpClientTracingHandler();
} else if (handler instanceof HttpRequestEncoder) {
ourHandler = new HttpClientRequestTracingHandler();
} else if (handler instanceof HttpResponseDecoder) {
ourHandler = new HttpClientResponseTracingHandler();
}

if (ourHandler != null) {
try {
pipeline.addAfter(name, ourHandler.getClass().getName(), ourHandler);
InstrumentationContext.get(ChannelHandler.class, ChannelHandler.class)
.putIfAbsent(handler, ourHandler);
} catch (IllegalArgumentException e) {
// Prevented adding duplicate handlers.
}
} catch (IllegalArgumentException e) {
// Prevented adding duplicate handlers.
}
}
}

public static class ChannelPipelineRemoveAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void removeHandler(
@Advice.This ChannelPipeline pipeline, @Advice.Argument(0) ChannelHandler handler) {
ContextStore<ChannelHandler, ChannelHandler> contextStore =
InstrumentationContext.get(ChannelHandler.class, ChannelHandler.class);
ChannelHandler ourHandler = contextStore.get(handler);
if (ourHandler != null) {
pipeline.remove(ourHandler);
contextStore.put(handler, null);
}
}
}

public static class ChannelPipelineRemoveByNameAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void removeHandler(
@Advice.This ChannelPipeline pipeline, @Advice.Argument(0) String name) {
ChannelHandler handler = pipeline.get(name);
if (handler == null) {
return;
}

ContextStore<ChannelHandler, ChannelHandler> contextStore =
InstrumentationContext.get(ChannelHandler.class, ChannelHandler.class);
ChannelHandler ourHandler = contextStore.get(handler);
if (ourHandler != null) {
pipeline.remove(ourHandler);
contextStore.put(handler, null);
}
}
}

public static class ChannelPipelineRemoveByClassAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void removeHandler(
@Advice.This ChannelPipeline pipeline,
@Advice.Argument(0) Class<ChannelHandler> handlerClass) {
ChannelHandler handler = pipeline.get(handlerClass);
if (handler == null) {
return;
}

ContextStore<ChannelHandler, ChannelHandler> contextStore =
InstrumentationContext.get(ChannelHandler.class, ChannelHandler.class);
ChannelHandler ourHandler = contextStore.get(handler);
if (ourHandler != null) {
pipeline.remove(ourHandler);
contextStore.put(handler, null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.netty.channel.DefaultChannelPipeline
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.codec.http.HttpClientCodec
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.client.HttpClientTracingHandler
import spock.lang.Unroll

class ChannelPipelineTest extends AgentInstrumentationSpecification {
// regression test for https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1373
@Unroll
def "test remove our handler #testName"() {
setup:
def channel = new EmbeddedChannel()
def channelPipeline = new DefaultChannelPipeline(channel)
def handler = new HttpClientCodec()

when:
// no handlers
channelPipeline.first() == null

then:
// add handler
channelPipeline.addLast("http", handler)
channelPipeline.first() == handler
// our handler was also added
channelPipeline.last().getClass() == HttpClientTracingHandler

and:
removeMethod.call(channelPipeline, handler)
// removing handler also removes our handler
channelPipeline.first() == null

where:
testName | removeMethod
"by instance" | { pipeline, h -> pipeline.remove(h) }
"by class" | { pipeline, h -> pipeline.remove(h.getClass()) }
"by name" | { pipeline, h -> pipeline.remove("http") }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
private Bootstrap bootstrap

def setupSpec() {
EventLoopGroup group = new NioEventLoopGroup()
EventLoopGroup group = getEventLoopGroup()
bootstrap = new Bootstrap()
bootstrap.group(group)
.channel(NioSocketChannel)
.channel(getChannelClass())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
Expand All @@ -55,6 +55,14 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
})
}

EventLoopGroup getEventLoopGroup() {
return new NioEventLoopGroup()
}

Class<Channel> getChannelClass() {
return NioSocketChannel
}

@Override
DefaultFullHttpRequest buildRequest(String method, URI uri, Map<String, String> headers) {
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.valueOf(method), uri.toString(), Unpooled.EMPTY_BUFFER)
Expand Down Expand Up @@ -251,8 +259,9 @@ class Netty41ClientTest extends HttpClientTest<DefaultFullHttpRequest> implement
channel.pipeline().addLast(new TracedHandlerFromInitializerHandler())

then:
null != channel.pipeline().get(HttpClientTracingHandler.getName())
null != channel.pipeline().remove("added_in_initializer")
null != channel.pipeline().remove(HttpClientTracingHandler.getName())
null == channel.pipeline().get(HttpClientTracingHandler.getName())
}

def "request with trace annotated method #method"() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.netty.channel.Channel
import io.netty.channel.EventLoopGroup
import io.netty.channel.epoll.Epoll
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollSocketChannel
import io.netty.channel.kqueue.KQueue
import io.netty.channel.kqueue.KQueueEventLoopGroup
import io.netty.channel.kqueue.KQueueSocketChannel
import org.junit.Assume

// netty client test with epoll/kqueue native library
class Netty41NativeClientTest extends Netty41ClientTest {

EventLoopGroup getEventLoopGroup() {
// linux
if (Epoll.isAvailable()) {
return new EpollEventLoopGroup()
}
// mac
if (KQueueHelper.isAvailable()) {
return new KQueueEventLoopGroup()
}

// skip test when native library was not found
Assume.assumeTrue("Native library was not found", false)
return super.getEventLoopGroup()
}

@Override
Class<Channel> getChannelClass() {
if (Epoll.isAvailable()) {
return EpollSocketChannel
}
if (KQueueHelper.isAvailable()) {
return KQueueSocketChannel
}
return null
}

static class KQueueHelper {
static boolean isAvailable() {
try {
return KQueue.isAvailable()
} catch (NoClassDefFoundError error) {
// kqueue is available only in latest dep tests
// in regular tests we only have a compile time dependency because kqueue support was added
// after 4.1.0
return false
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ class RatpackHttpClientTest extends HttpClientTest<Void> implements AgentTestTra
if (HttpClientSpec.metaClass.getMetaMethod("execController") != null) {
it.execController(exec.getController())
}
configureClient(it)
}

void configureClient(HttpClientSpec spec) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package client

import ratpack.http.client.HttpClientSpec

class RatpackPooledHttpClientTest extends RatpackHttpClientTest {

@Override
void configureClient(HttpClientSpec spec) {
spec.poolSize(5)
}
}

0 comments on commit 23a40d9

Please sign in to comment.