Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions modules/transport-netty4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,7 @@ tasks.named("thirdPartyAudit").configure {
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5'
)
}

tasks.named('forbiddenApisMain').configure {
signaturesFiles += files('forbidden/netty-signatures.txt')
}
9 changes: 9 additions & 0 deletions modules/transport-netty4/forbidden/netty-signatures.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".

@defaultMessage Use org.elasticsearch.transport.netty4.Netty4Utils.addListener(io.netty.channel.ChannelFuture, io.netty.channel.ChannelFutureListener) instead
io.netty.channel.ChannelFuture#addListener(io.netty.util.concurrent.GenericFutureListener)
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ public void onFailure(Exception e) {
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
Netty4Utils.addListener(channel.close(), f -> {
finishingWrite.combiner().add(f.channel().newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
Expand Down Expand Up @@ -417,7 +417,7 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite)
final boolean isPartComplete = bodyPart.isPartComplete();
final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart();
final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
Netty4Utils.addListener(f, ignored -> bytes.close());
combiner.add(f);
return isPartComplete;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
}

private static void addClosedExceptionLogger(Channel channel) {
channel.closeFuture().addListener(f -> {
if (f.isSuccess() == false) {
logger.debug(() -> format("exception while closing channel: %s", channel), f.cause());
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
Expand All @@ -28,6 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
Expand Down Expand Up @@ -141,7 +145,7 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList
// can only be completed by some network event from this point on. However...
final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
addListener(promise, listener);
assert assertCorrectPromiseListenerThreading(channel, promise);
assert assertCorrectPromiseListenerThreading(promise);
channel.writeAndFlush(message, promise);
if (channel.eventLoop().isShuttingDown()) {
// ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that
Expand All @@ -156,10 +160,10 @@ public static void safeWriteAndFlush(Channel channel, Object message, ActionList
}
}

private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future<?> promise) {
final var eventLoop = channel.eventLoop();
promise.addListener(future -> {
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated()
private static boolean assertCorrectPromiseListenerThreading(ChannelPromise promise) {
addListener(promise, future -> {
var eventLoop = future.channel().eventLoop();
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || eventLoop.isTerminated()
: future.cause();
});
return true;
Expand All @@ -183,4 +187,9 @@ public static void addListener(Future<Void> future, ActionListener<Void> listene
}
});
}

@SuppressForbidden(reason = "single point for adding listeners that enforces use of ChannelFutureListener")
public static void addListener(ChannelFuture channelFuture, ChannelFutureListener listener) {
channelFuture.addListener(listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.PromiseCombiner;

import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -94,13 +94,13 @@ private void writeInSlices(ChannelHandlerContext ctx, ChannelPromise promise, By
final int bufferSize = Math.min(readableBytes, MAX_BYTES_PER_WRITE);
if (readableBytes == bufferSize) {
// last write for this chunk we're done
ctx.write(buf).addListener(forwardResultListener(ctx, promise));
Netty4Utils.addListener(ctx.write(buf), forwardResultListener(promise));
return;
}
final int readerIndex = buf.readerIndex();
final ByteBuf writeBuffer = buf.retainedSlice(readerIndex, bufferSize);
buf.readerIndex(readerIndex + bufferSize);
ctx.write(writeBuffer).addListener(forwardFailureListener(ctx, promise));
Netty4Utils.addListener(ctx.write(writeBuffer), forwardFailureListener(promise));
if (ctx.channel().isWritable() == false) {
// channel isn't writable any longer -> move to queuing
queueWrite(buf, promise);
Expand Down Expand Up @@ -164,9 +164,9 @@ private boolean doFlush(ChannelHandlerContext ctx) {
final ChannelFuture writeFuture = ctx.write(writeBuffer);
if (sliced == false) {
currentWrite = null;
writeFuture.addListener(forwardResultListener(ctx, write.promise));
Netty4Utils.addListener(writeFuture, forwardResultListener(write.promise));
} else {
writeFuture.addListener(forwardFailureListener(ctx, write.promise));
Netty4Utils.addListener(writeFuture, forwardFailureListener(write.promise));
}
}
ctx.flush();
Expand All @@ -176,18 +176,18 @@ private boolean doFlush(ChannelHandlerContext ctx) {
return true;
}

private static GenericFutureListener<Future<Void>> forwardFailureListener(ChannelHandlerContext ctx, ChannelPromise promise) {
private static ChannelFutureListener forwardFailureListener(ChannelPromise promise) {
return future -> {
assert ctx.executor().inEventLoop();
assert future.channel().eventLoop().inEventLoop();
if (future.isSuccess() == false) {
promise.tryFailure(future.cause());
}
};
}

private static GenericFutureListener<Future<Void>> forwardResultListener(ChannelHandlerContext ctx, ChannelPromise promise) {
private static ChannelFutureListener forwardResultListener(ChannelPromise promise) {
return future -> {
assert ctx.executor().inEventLoop();
assert future.channel().eventLoop().inEventLoop();
if (future.isSuccess()) {
promise.trySuccess();
} else {
Expand Down
7 changes: 6 additions & 1 deletion x-pack/plugin/security/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ tasks.named("forbiddenPatterns").configure {
}

tasks.named('forbiddenApisMain').configure {
signaturesFiles += files('forbidden/ldap-signatures.txt', 'forbidden/xml-signatures.txt', 'forbidden/oidc-signatures.txt')
signaturesFiles += files(
'forbidden/ldap-signatures.txt',
'forbidden/xml-signatures.txt',
'forbidden/oidc-signatures.txt',
project(':modules:transport-netty4').file('forbidden/netty-signatures.txt')
)
}

tasks.named('forbiddenApisTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.SharedGroupFactory;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.transport.ProfileConfigurations;
Expand Down Expand Up @@ -341,7 +342,7 @@ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, Sock
final SslHandler sslHandler = new SslHandler(sslEngine);
ctx.pipeline().replace(this, "ssl", sslHandler);
final Future<?> handshakePromise = sslHandler.handshakeFuture();
connectPromise.addListener(result -> {
Netty4Utils.addListener(connectPromise, result -> {
if (result.isSuccess() == false) {
promise.tryFailure(result.cause());
} else {
Expand Down