diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index d2c97ad08f88c..d80b63bec53d8 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -241,3 +241,7 @@ tasks.named("thirdPartyAudit").configure { 'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5' ) } + +tasks.named('forbiddenApisMain').configure { + signaturesFiles += files('forbidden/netty-signatures.txt') +} \ No newline at end of file diff --git a/modules/transport-netty4/forbidden/netty-signatures.txt b/modules/transport-netty4/forbidden/netty-signatures.txt new file mode 100644 index 0000000000000..4c47b2aee1253 --- /dev/null +++ b/modules/transport-netty4/forbidden/netty-signatures.txt @@ -0,0 +1,9 @@ + # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + # or more contributor license agreements. Licensed under the "Elastic License + # 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + # Public License v 1"; you may not use this file except in compliance with, at + # your election, the "Elastic License 2.0", the "GNU Affero General Public + # License v3.0 only", or the "Server Side Public License, v 1". + +@defaultMessage Use org.elasticsearch.transport.netty4.Netty4Utils.addListener(io.netty.channel.ChannelFuture, io.netty.channel.ChannelFutureListener) instead +io.netty.channel.ChannelFuture#addListener(io.netty.util.concurrent.GenericFutureListener) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index be94212af9815..d8eadf4fca95d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -302,8 +302,8 @@ public void onFailure(Exception e) { Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), e ); - channel.close().addListener(ignored -> { - finishingWrite.combiner().add(channel.newFailedFuture(e)); + Netty4Utils.addListener(channel.close(), f -> { + finishingWrite.combiner().add(f.channel().newFailedFuture(e)); finishingWrite.combiner().finish(finishingWrite.onDone()); }); checkShutdown(); @@ -417,7 +417,7 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) final boolean isPartComplete = bodyPart.isPartComplete(); final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart(); final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); - f.addListener(ignored -> bytes.close()); + Netty4Utils.addListener(f, ignored -> bytes.close()); combiner.add(f); return isPartComplete; } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index b6e66b4cd4874..b99c76e7b0615 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -381,9 +381,9 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster } private static void addClosedExceptionLogger(Channel channel) { - channel.closeFuture().addListener(f -> { - if (f.isSuccess() == false) { - logger.debug(() -> format("exception while closing channel: %s", channel), f.cause()); + Netty4Utils.addListener(channel.closeFuture(), channelFuture -> { + if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { + logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); } }); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index f0f93533c8867..b3596c75999ec 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -13,6 +13,9 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; import io.netty.util.NettyRuntime; import io.netty.util.concurrent.Future; @@ -28,6 +31,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Booleans; +import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.transport.TransportException; import java.io.IOException; @@ -141,7 +145,7 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList // can only be completed by some network event from this point on. However... final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); addListener(promise, listener); - assert assertCorrectPromiseListenerThreading(channel, promise); + assert assertCorrectPromiseListenerThreading(promise); channel.writeAndFlush(message, promise); if (channel.eventLoop().isShuttingDown()) { // ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that @@ -156,10 +160,10 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList } } - private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future promise) { - final var eventLoop = channel.eventLoop(); - promise.addListener(future -> { - assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated() + private static boolean assertCorrectPromiseListenerThreading(ChannelPromise promise) { + addListener(promise, future -> { + var eventLoop = future.channel().eventLoop(); + assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || eventLoop.isTerminated() : future.cause(); }); return true; @@ -183,4 +187,9 @@ public static void addListener(Future future, ActionListener listene } }); } + + @SuppressForbidden(reason = "single point for adding listeners that enforces use of ChannelFutureListener") + public static void addListener(ChannelFuture channelFuture, ChannelFutureListener listener) { + channelFuture.addListener(listener); + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java index 7580f1eaadf14..15011957040af 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4WriteThrottlingHandler.java @@ -14,10 +14,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.PromiseCombiner; import org.apache.lucene.util.BytesRef; @@ -94,13 +94,13 @@ private void writeInSlices(ChannelHandlerContext ctx, ChannelPromise promise, By final int bufferSize = Math.min(readableBytes, MAX_BYTES_PER_WRITE); if (readableBytes == bufferSize) { // last write for this chunk we're done - ctx.write(buf).addListener(forwardResultListener(ctx, promise)); + Netty4Utils.addListener(ctx.write(buf), forwardResultListener(promise)); return; } final int readerIndex = buf.readerIndex(); final ByteBuf writeBuffer = buf.retainedSlice(readerIndex, bufferSize); buf.readerIndex(readerIndex + bufferSize); - ctx.write(writeBuffer).addListener(forwardFailureListener(ctx, promise)); + Netty4Utils.addListener(ctx.write(writeBuffer), forwardFailureListener(promise)); if (ctx.channel().isWritable() == false) { // channel isn't writable any longer -> move to queuing queueWrite(buf, promise); @@ -164,9 +164,9 @@ private boolean doFlush(ChannelHandlerContext ctx) { final ChannelFuture writeFuture = ctx.write(writeBuffer); if (sliced == false) { currentWrite = null; - writeFuture.addListener(forwardResultListener(ctx, write.promise)); + Netty4Utils.addListener(writeFuture, forwardResultListener(write.promise)); } else { - writeFuture.addListener(forwardFailureListener(ctx, write.promise)); + Netty4Utils.addListener(writeFuture, forwardFailureListener(write.promise)); } } ctx.flush(); @@ -176,18 +176,18 @@ private boolean doFlush(ChannelHandlerContext ctx) { return true; } - private static GenericFutureListener> forwardFailureListener(ChannelHandlerContext ctx, ChannelPromise promise) { + private static ChannelFutureListener forwardFailureListener(ChannelPromise promise) { return future -> { - assert ctx.executor().inEventLoop(); + assert future.channel().eventLoop().inEventLoop(); if (future.isSuccess() == false) { promise.tryFailure(future.cause()); } }; } - private static GenericFutureListener> forwardResultListener(ChannelHandlerContext ctx, ChannelPromise promise) { + private static ChannelFutureListener forwardResultListener(ChannelPromise promise) { return future -> { - assert ctx.executor().inEventLoop(); + assert future.channel().eventLoop().inEventLoop(); if (future.isSuccess()) { promise.trySuccess(); } else { diff --git a/x-pack/plugin/security/build.gradle b/x-pack/plugin/security/build.gradle index d3697eade8b24..ebd435bf653a2 100644 --- a/x-pack/plugin/security/build.gradle +++ b/x-pack/plugin/security/build.gradle @@ -193,7 +193,12 @@ tasks.named("forbiddenPatterns").configure { } tasks.named('forbiddenApisMain').configure { - signaturesFiles += files('forbidden/ldap-signatures.txt', 'forbidden/xml-signatures.txt', 'forbidden/oidc-signatures.txt') + signaturesFiles += files( + 'forbidden/ldap-signatures.txt', + 'forbidden/xml-signatures.txt', + 'forbidden/oidc-signatures.txt', + project(':modules:transport-netty4').file('forbidden/netty-signatures.txt') + ) } tasks.named('forbiddenApisTest').configure { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java index c4c583a2c76c9..ff8b6f5eaac39 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/core/security/transport/netty4/SecurityNetty4Transport.java @@ -41,6 +41,7 @@ import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TransportSettings; import org.elasticsearch.transport.netty4.Netty4Transport; +import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.transport.netty4.SharedGroupFactory; import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.security.transport.ProfileConfigurations; @@ -341,7 +342,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock final SslHandler sslHandler = new SslHandler(sslEngine); ctx.pipeline().replace(this, "ssl", sslHandler); final Future handshakePromise = sslHandler.handshakeFuture(); - connectPromise.addListener(result -> { + Netty4Utils.addListener(connectPromise, result -> { if (result.isSuccess() == false) { promise.tryFailure(result.cause()); } else {