From f61db62f6103762c375e45754020765b8a4c72e0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 16 Sep 2024 22:54:21 +0200 Subject: [PATCH 1/2] Use ChannelFutureListener in Netty code to reduce capturing lambdas Mainly motivated by simplifying the reference chains for Netty buffers and have easier to analyze heap dumps in some spots but also a small performance win in and of itself. --- .../transport/netty4/Netty4Transport.java | 7 ++++--- .../transport/netty4/Netty4Utils.java | 12 +++++++----- .../netty4/Netty4WriteThrottlingHandler.java | 18 +++++++++--------- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index b6e66b4cd4874..71c1e16146b91 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -13,6 +13,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -381,9 +382,9 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster } private static void addClosedExceptionLogger(Channel channel) { - channel.closeFuture().addListener(f -> { - if (f.isSuccess() == false) { - logger.debug(() -> format("exception while closing channel: %s", channel), f.cause()); + channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { + logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); } }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index f0f93533c8867..2fcb4158a07c9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -13,6 +13,8 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.util.NettyRuntime; import io.netty.util.concurrent.Future; @@ -141,7 +143,7 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList // can only be completed by some network event from this point on. However... final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); addListener(promise, listener); - assert assertCorrectPromiseListenerThreading(channel, promise); + assert assertCorrectPromiseListenerThreading(promise); channel.writeAndFlush(message, promise); if (channel.eventLoop().isShuttingDown()) { // ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that @@ -156,10 +158,10 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList } } - private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future promise) { - final var eventLoop = channel.eventLoop(); - promise.addListener(future -> { - assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated() + private static boolean assertCorrectPromiseListenerThreading(ChannelPromise promise) { + promise.addListener((ChannelFutureListener) future -> { + var eventLoop = future.channel().eventLoop(); + assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || eventLoop.isTerminated() : future.cause(); }); return true; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java index 7580f1eaadf14..fa1dae0ffd8e1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java @@ -14,10 +14,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.PromiseCombiner; import org.apache.lucene.util.BytesRef; @@ -94,13 +94,13 @@ private void writeInSlices(ChannelHandlerContext ctx, ChannelPromise promise, By final int bufferSize = Math.min(readableBytes, MAX_BYTES_PER_WRITE); if (readableBytes == bufferSize) { // last write for this chunk we're done - ctx.write(buf).addListener(forwardResultListener(ctx, promise)); + ctx.write(buf).addListener(forwardResultListener(promise)); return; } final int readerIndex = buf.readerIndex(); final ByteBuf writeBuffer = buf.retainedSlice(readerIndex, bufferSize); buf.readerIndex(readerIndex + bufferSize); - ctx.write(writeBuffer).addListener(forwardFailureListener(ctx, promise)); + ctx.write(writeBuffer).addListener(forwardFailureListener(promise)); if (ctx.channel().isWritable() == false) { // channel isn't writable any longer -> move to queuing queueWrite(buf, promise); @@ -164,9 +164,9 @@ private boolean doFlush(ChannelHandlerContext ctx) { final ChannelFuture writeFuture = ctx.write(writeBuffer); if (sliced == false) { currentWrite = null; - writeFuture.addListener(forwardResultListener(ctx, write.promise)); + writeFuture.addListener(forwardResultListener(write.promise)); } else { - writeFuture.addListener(forwardFailureListener(ctx, write.promise)); + writeFuture.addListener(forwardFailureListener(write.promise)); } } ctx.flush(); @@ -176,18 +176,18 @@ private boolean doFlush(ChannelHandlerContext ctx) { return true; } - private static GenericFutureListener> forwardFailureListener(ChannelHandlerContext ctx, ChannelPromise promise) { + private static ChannelFutureListener forwardFailureListener(ChannelPromise promise) { return future -> { - assert ctx.executor().inEventLoop(); + assert future.channel().eventLoop().inEventLoop(); if (future.isSuccess() == false) { promise.tryFailure(future.cause()); } }; } - private static GenericFutureListener> forwardResultListener(ChannelHandlerContext ctx, ChannelPromise promise) { + private static ChannelFutureListener forwardResultListener(ChannelPromise promise) { return future -> { - assert ctx.executor().inEventLoop(); + assert future.channel().eventLoop().inEventLoop(); if (future.isSuccess()) { promise.trySuccess(); } else { From 07ccef45b7ca69663a3a3db7baddaf72b7edb45c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 18 Sep 2024 17:15:03 +0200 Subject: [PATCH 2/2] forbiddenapis --- modules/transport-netty4/build.gradle | 4 ++++ modules/transport-netty4/forbidden/netty-signatures.txt | 9 +++++++++ .../http/netty4/Netty4HttpPipeliningHandler.java | 6 +++--- .../elasticsearch/transport/netty4/Netty4Transport.java | 3 +-- .../org/elasticsearch/transport/netty4/Netty4Utils.java | 9 ++++++++- .../transport/netty4/Netty4WriteThrottlingHandler.java | 8 ++++---- x-pack/plugin/security/build.gradle | 7 ++++++- .../transport/netty4/SecurityNetty4Transport.java | 3 ++- 8 files changed, 37 insertions(+), 12 deletions(-) create mode 100644 modules/transport-netty4/forbidden/netty-signatures.txt diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index d2c97ad08f88c..d80b63bec53d8 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -241,3 +241,7 @@ tasks.named("thirdPartyAudit").configure { 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5' ) } + +tasks.named('forbiddenApisMain').configure { + signaturesFiles += files('forbidden/netty-signatures.txt') +} \ No newline at end of file diff --git a/modules/transport-netty4/forbidden/netty-signatures.txt b/modules/transport-netty4/forbidden/netty-signatures.txt new file mode 100644 index 0000000000000..4c47b2aee1253 --- /dev/null +++ b/modules/transport-netty4/forbidden/netty-signatures.txt @@ -0,0 +1,9 @@ + # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + # or more contributor license agreements. Licensed under the "Elastic License + # 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + # Public License v 1"; you may not use this file except in compliance with, at + # your election, the "Elastic License 2.0", the "GNU Affero General Public + # License v3.0 only", or the "Server Side Public License, v 1". + +@defaultMessage Use org.elasticsearch.transport.netty4.Netty4Utils.addListener(io.netty.channel.ChannelFuture, io.netty.channel.ChannelFutureListener) instead +io.netty.channel.ChannelFuture#addListener(io.netty.util.concurrent.GenericFutureListener) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index be94212af9815..d8eadf4fca95d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -302,8 +302,8 @@ public void onFailure(Exception e) { Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), e ); - channel.close().addListener(ignored -> { - finishingWrite.combiner().add(channel.newFailedFuture(e)); + Netty4Utils.addListener(channel.close(), f -> { + finishingWrite.combiner().add(f.channel().newFailedFuture(e)); finishingWrite.combiner().finish(finishingWrite.onDone()); }); checkShutdown(); @@ -417,7 +417,7 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) final boolean isPartComplete = bodyPart.isPartComplete(); final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart(); final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); - f.addListener(ignored -> bytes.close()); + Netty4Utils.addListener(f, ignored -> bytes.close()); combiner.add(f); return isPartComplete; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 71c1e16146b91..b99c76e7b0615 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -13,7 +13,6 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -382,7 +381,7 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster } private static void addClosedExceptionLogger(Channel channel) { - channel.closeFuture().addListener((ChannelFutureListener) channelFuture -> { + Netty4Utils.addListener(channel.closeFuture(), channelFuture -> { if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index 2fcb4158a07c9..b3596c75999ec 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -13,6 +13,7 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; @@ -30,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Booleans; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.transport.TransportException; import java.io.IOException; @@ -159,7 +161,7 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList } private static boolean assertCorrectPromiseListenerThreading(ChannelPromise promise) { - promise.addListener((ChannelFutureListener) future -> { + addListener(promise, future -> { var eventLoop = future.channel().eventLoop(); assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || eventLoop.isTerminated() : future.cause(); @@ -185,4 +187,9 @@ public static void addListener(Future future, ActionListener listene } }); } + + @SuppressForbidden(reason = "single point for adding listeners that enforces use of ChannelFutureListener") + public static void addListener(ChannelFuture channelFuture, ChannelFutureListener listener) { + channelFuture.addListener(listener); + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java index fa1dae0ffd8e1..15011957040af 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java @@ -94,13 +94,13 @@ private void writeInSlices(ChannelHandlerContext ctx, ChannelPromise promise, By final int bufferSize = Math.min(readableBytes, MAX_BYTES_PER_WRITE); if (readableBytes == bufferSize) { // last write for this chunk we're done - ctx.write(buf).addListener(forwardResultListener(promise)); + Netty4Utils.addListener(ctx.write(buf), forwardResultListener(promise)); return; } final int readerIndex = buf.readerIndex(); final ByteBuf writeBuffer = buf.retainedSlice(readerIndex, bufferSize); buf.readerIndex(readerIndex + bufferSize); - ctx.write(writeBuffer).addListener(forwardFailureListener(promise)); + Netty4Utils.addListener(ctx.write(writeBuffer), forwardFailureListener(promise)); if (ctx.channel().isWritable() == false) { // channel isn't writable any longer -> move to queuing queueWrite(buf, promise); @@ -164,9 +164,9 @@ private boolean doFlush(ChannelHandlerContext ctx) { final ChannelFuture writeFuture = ctx.write(writeBuffer); if (sliced == false) { currentWrite = null; - writeFuture.addListener(forwardResultListener(write.promise)); + Netty4Utils.addListener(writeFuture, forwardResultListener(write.promise)); } else { - writeFuture.addListener(forwardFailureListener(write.promise)); + Netty4Utils.addListener(writeFuture, forwardFailureListener(write.promise)); } } ctx.flush(); diff --git a/x-pack/plugin/security/build.gradle b/x-pack/plugin/security/build.gradle index d3697eade8b24..ebd435bf653a2 100644 --- a/x-pack/plugin/security/build.gradle +++ b/x-pack/plugin/security/build.gradle @@ -193,7 +193,12 @@ tasks.named("forbiddenPatterns").configure { } tasks.named('forbiddenApisMain').configure { - signaturesFiles += files('forbidden/ldap-signatures.txt', 'forbidden/xml-signatures.txt', 'forbidden/oidc-signatures.txt') + signaturesFiles += files( + 'forbidden/ldap-signatures.txt', + 'forbidden/xml-signatures.txt', + 'forbidden/oidc-signatures.txt', + project(':modules:transport-netty4').file('forbidden/netty-signatures.txt') + ) } tasks.named('forbiddenApisTest').configure { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index c4c583a2c76c9..ff8b6f5eaac39 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; +import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.transport.netty4.SharedGroupFactory; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.transport.ProfileConfigurations; @@ -341,7 +342,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock final SslHandler sslHandler = new SslHandler(sslEngine); ctx.pipeline().replace(this, "ssl", sslHandler); final Future handshakePromise = sslHandler.handshakeFuture(); - connectPromise.addListener(result -> { + Netty4Utils.addListener(connectPromise, result -> { if (result.isSuccess() == false) { promise.tryFailure(result.cause()); } else {