From d4a4ecc52f5231b4d1d38abe3044645f42ffc3fd Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 15 Feb 2024 19:20:19 +0000 Subject: [PATCH] Fix use-after-free at event-loop shutdown (#105486) We could still be manipulating a network message when the event loop shuts down, causing us to close the message while it's still in use. This is at best going to be a little surprising to the caller, and at worst could be an outright use-after-free bug. This commit moves the double-check for a leaked promise to happen strictly after the event loop has fully terminated, so that we can be sure we've finished using it by this point. Relates #105306, #97301 --- docs/changelog/105486.yaml | 5 ++ .../http/netty4/Netty4HttpChannel.java | 15 ++--- .../http/netty4/Netty4HttpServerChannel.java | 5 +- .../transport/netty4/Netty4TcpChannel.java | 60 ++---------------- .../netty4/Netty4TcpServerChannel.java | 4 +- .../transport/netty4/Netty4Utils.java | 61 +++++++++++++++++++ .../org/elasticsearch/http/HttpChannel.java | 4 +- .../elasticsearch/rest/RestController.java | 1 + .../transport/OutboundHandler.java | 8 +++ .../transport/TransportMessage.java | 7 +++ 10 files changed, 100 insertions(+), 70 deletions(-) create mode 100644 docs/changelog/105486.yaml diff --git a/docs/changelog/105486.yaml b/docs/changelog/105486.yaml new file mode 100644 index 0000000000000..befdaec2301c6 --- /dev/null +++ b/docs/changelog/105486.yaml @@ -0,0 +1,5 @@ +pr: 105486 +summary: Fix use-after-free at event-loop shutdown +area: Network +type: bug +issues: [] diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index 705cb02dfb246..83728b8ef73c2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -14,12 +14,13 @@ import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpResponse; -import org.elasticsearch.transport.TransportException; -import org.elasticsearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; import java.net.SocketAddress; +import static org.elasticsearch.transport.netty4.Netty4Utils.addListener; +import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush; + public class Netty4HttpChannel implements HttpChannel { private final Channel channel; @@ -27,18 +28,12 @@ public class Netty4HttpChannel implements HttpChannel { Netty4HttpChannel(Channel channel) { this.channel = channel; - Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); + addListener(this.channel.closeFuture(), closeContext); } @Override public void sendResponse(HttpResponse response, ActionListener listener) { - // We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to - // use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved. - var wrapped = ActionListener.notifyOnce(listener); - channel.writeAndFlush(response, Netty4TcpChannel.addPromise(wrapped, channel)); - if (channel.eventLoop().isShutdown()) { - wrapped.onFailure(new TransportException("Cannot send HTTP response, event loop is shutting down.")); - } + safeWriteAndFlush(channel, response, listener); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java index 7356b7fb65cbc..58dc675b76bb1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java @@ -13,10 +13,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.http.HttpServerChannel; -import org.elasticsearch.transport.netty4.Netty4TcpChannel; import java.net.InetSocketAddress; +import static org.elasticsearch.transport.netty4.Netty4Utils.addListener; + public class Netty4HttpServerChannel implements HttpServerChannel { private final Channel channel; @@ -24,7 +25,7 @@ public class Netty4HttpServerChannel implements HttpServerChannel { Netty4HttpServerChannel(Channel channel) { this.channel = channel; - Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); + addListener(this.channel.closeFuture(), closeContext); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 33fdb00e7abb2..726f1293c5cf3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -11,19 +11,19 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPromise; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasables; import org.elasticsearch.transport.TcpChannel; -import org.elasticsearch.transport.TransportException; import java.net.InetSocketAddress; +import static org.elasticsearch.transport.netty4.Netty4Utils.addListener; +import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush; + public class Netty4TcpChannel implements TcpChannel { private final Channel channel; @@ -44,52 +44,6 @@ public class Netty4TcpChannel implements TcpChannel { addListener(connectFuture, connectContext); } - /** - * Adds a listener that completes the given {@link ListenableFuture} to the given {@link ChannelFuture}. - * @param channelFuture Channel future - * @param listener Listener to complete - */ - public static void addListener(ChannelFuture channelFuture, ListenableFuture listener) { - channelFuture.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(null); - } else { - Throwable cause = f.cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - listener.onFailure(new Exception(cause)); - } else { - listener.onFailure((Exception) cause); - } - } - }); - } - - /** - * Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener} - * on its completion. - * @param listener lister to invoke - * @param channel channel - * @return write promise - */ - public static ChannelPromise addPromise(ActionListener listener, Channel channel) { - ChannelPromise writePromise = channel.newPromise(); - writePromise.addListener(f -> { - if (f.isSuccess()) { - listener.onResponse(null); - } else { - final Throwable cause = f.cause(); - ExceptionsHelper.maybeDieOnAnotherThread(cause); - if (cause instanceof Error) { - listener.onFailure(new Exception(cause)); - } else { - listener.onFailure((Exception) cause); - } - } - }); - return writePromise; - } - @Override public void close() { if (rstOnClose) { @@ -162,13 +116,7 @@ public InetSocketAddress getRemoteAddress() { @Override public void sendMessage(BytesReference reference, ActionListener listener) { - // We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to - // use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved. - var wrapped = ActionListener.notifyOnce(listener); - channel.writeAndFlush(reference, addPromise(wrapped, channel)); - if (channel.eventLoop().isShutdown()) { - wrapped.onFailure(new TransportException("Cannot send message, event loop is shutting down.")); - } + safeWriteAndFlush(channel, reference, listener); } public Channel getNettyChannel() { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java index 1279c50133643..13f691e6e0e5e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java @@ -16,6 +16,8 @@ import java.net.InetSocketAddress; +import static org.elasticsearch.transport.netty4.Netty4Utils.addListener; + public class Netty4TcpServerChannel implements TcpServerChannel { private final Channel channel; @@ -23,7 +25,7 @@ public class Netty4TcpServerChannel implements TcpServerChannel { Netty4TcpServerChannel(Channel channel) { this.channel = channel; - Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext); + addListener(this.channel.closeFuture(), closeContext); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index b9986dbf00d87..1025ad11ba05e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -11,22 +11,30 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.DefaultChannelPromise; import io.netty.util.NettyRuntime; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Booleans; +import org.elasticsearch.transport.TransportException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; public class Netty4Utils { @@ -121,4 +129,57 @@ public static Recycler createRecycler(Settings settings) { setAvailableProcessors(EsExecutors.allocatedProcessors(settings)); return NettyAllocator.getRecycler(); } + + /** + * Calls {@link Channel#writeAndFlush} to write the given message to the given channel, but ensures that the listener is completed even + * if the event loop is concurrently shutting down since Netty does not offer this guarantee. + */ + public static void safeWriteAndFlush(Channel channel, Object message, ActionListener listener) { + // Use ImmediateEventExecutor.INSTANCE since we want to be able to complete this promise, and any waiting listeners, even if the + // channel's event loop has shut down. Normally this completion will happen on the channel's event loop anyway because the write op + // can only be completed by some network event from this point on. However... + final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + addListener(promise, listener); + assert assertCorrectPromiseListenerThreading(channel, promise); + channel.writeAndFlush(message, promise); + if (channel.eventLoop().isShuttingDown()) { + // ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that + // we cannot know if the preceding writeAndFlush made it onto its queue before shutdown or whether it will just vanish without a + // trace, so to avoid a leak we must double-check that the final listener is completed. + channel.eventLoop().terminationFuture().addListener(ignored -> + // NB the promise executor is ImmediateEventExecutor.INSTANCE which means this call to tryFailure() will ensure its completion, + // and the completion of any waiting listeners, without forking away from the current thread. The current thread might be the + // thread that was running the event loop since that's where the terminationFuture is completed, or it might be a thread which + // called (and is still calling) safeWriteAndFlush. + promise.tryFailure(new TransportException("Cannot send network message, event loop is shutting down."))); + } + } + + private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future promise) { + final var eventLoop = channel.eventLoop(); + promise.addListener(future -> { + assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated() + : future.cause(); + }); + return true; + } + + /** + * Subscribes the given {@link ActionListener} to the given {@link Future}. + */ + public static void addListener(Future future, ActionListener listener) { + future.addListener(f -> { + if (f.isSuccess()) { + listener.onResponse(null); + } else { + final Throwable cause = f.cause(); + ExceptionsHelper.maybeDieOnAnotherThread(cause); + if (cause instanceof Exception exception) { + listener.onFailure(exception); + } else { + listener.onFailure(new Exception(cause)); + } + } + }); + } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpChannel.java b/server/src/main/java/org/elasticsearch/http/HttpChannel.java index 635e6c095c4d0..cf3dbcf9fd029 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpChannel.java +++ b/server/src/main/java/org/elasticsearch/http/HttpChannel.java @@ -20,7 +20,9 @@ public interface HttpChannel extends CloseableChannel { * completed. * * @param response to send to channel - * @param listener to execute upon send completion + * @param listener to execute upon send completion. Note that this listener is usually completed on a network thread in a context in + * which there's a risk of stack overflows if on close it calls back into the network layer in a manner that might end + * up nesting too deeply. When in doubt, dispatch any further work onto a separate thread. */ void sendResponse(HttpResponse response, ActionListener listener); diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index e3cc95b65a165..d197fe50d60d5 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -885,6 +885,7 @@ public void close() { void addChunkLength(long chunkLength) { assert chunkLength >= 0L : chunkLength; assert Transports.assertTransportThread(); // always called on the transport worker, no need for sync + assert get() != null : "already closed"; responseLength += chunkLength; } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index 3261a6c9678b5..0bb8ddeb80c5f 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -71,6 +71,14 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) { this.slowLogThresholdMs = slowLogThreshold.getMillis(); } + /** + * Send a raw message over the given channel. + * + * @param listener completed when the message has been sent, on the network thread (unless the network thread has shut down). Take care + * if calling back into the network layer from this listener without dispatching to a new thread since if we do that + * too many times in a row it can cause a stack overflow. When in doubt, dispatch any follow-up work onto a separate + * thread. + */ void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener listener) { internalSend(channel, bytes, null, listener); } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java index d5257bf2840f3..f07cce5a731f8 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -51,6 +51,13 @@ public boolean tryIncRef() { return true; } + /** + * {@inheritDoc} + * + * Note that the lifetime of an outbound {@link TransportMessage} lasts at least until it has been fully sent over the network, and it + * may be closed on a network thread in a context in which there's a risk of stack overflows if on close it calls back into the network + * layer in a manner that might end up nesting too deeply. When in doubt, dispatch any further work onto a separate thread. + */ @Override public boolean decRef() { // noop, override to manage the life-cycle of resources held by a transport message