Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/105486.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105486
summary: Fix use-after-free at event-loop shutdown
area: Network
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,26 @@
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;
import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush;

public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();

Netty4HttpChannel(Channel channel) {
this.channel = channel;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
addListener(this.channel.closeFuture(), closeContext);
}

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
// We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to
// use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved.
var wrapped = ActionListener.notifyOnce(listener);
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(wrapped, channel));
if (channel.eventLoop().isShutdown()) {
wrapped.onFailure(new TransportException("Cannot send HTTP response, event loop is shutting down."));
}
safeWriteAndFlush(channel, response, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;

import java.net.InetSocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;

public class Netty4HttpServerChannel implements HttpServerChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();

Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
addListener(this.channel.closeFuture(), closeContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;

import java.net.InetSocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;
import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush;

public class Netty4TcpChannel implements TcpChannel {

private final Channel channel;
Expand All @@ -44,52 +44,6 @@ public class Netty4TcpChannel implements TcpChannel {
addListener(connectFuture, connectContext);
}

/**
* Adds a listener that completes the given {@link ListenableFuture} to the given {@link ChannelFuture}.
* @param channelFuture Channel future
* @param listener Listener to complete
*/
public static void addListener(ChannelFuture channelFuture, ListenableFuture<Void> listener) {
channelFuture.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
}

/**
* Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener}
* on its completion.
* @param listener lister to invoke
* @param channel channel
* @return write promise
*/
public static ChannelPromise addPromise(ActionListener<Void> listener, Channel channel) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
return writePromise;
}

@Override
public void close() {
if (rstOnClose) {
Expand Down Expand Up @@ -162,13 +116,7 @@ public InetSocketAddress getRemoteAddress() {

@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
// We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to
// use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved.
var wrapped = ActionListener.notifyOnce(listener);
channel.writeAndFlush(reference, addPromise(wrapped, channel));
if (channel.eventLoop().isShutdown()) {
wrapped.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
safeWriteAndFlush(channel, reference, listener);
}

public Channel getNettyChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

import java.net.InetSocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;

public class Netty4TcpServerChannel implements TcpServerChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();

Netty4TcpServerChannel(Channel channel) {
this.channel = channel;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
addListener(this.channel.closeFuture(), closeContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,30 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

public class Netty4Utils {
Expand Down Expand Up @@ -121,4 +129,57 @@ public static Recycler<BytesRef> createRecycler(Settings settings) {
setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
return NettyAllocator.getRecycler();
}

/**
* Calls {@link Channel#writeAndFlush} to write the given message to the given channel, but ensures that the listener is completed even
* if the event loop is concurrently shutting down since Netty does not offer this guarantee.
*/
public static void safeWriteAndFlush(Channel channel, Object message, ActionListener<Void> listener) {
// Use ImmediateEventExecutor.INSTANCE since we want to be able to complete this promise, and any waiting listeners, even if the
// channel's event loop has shut down. Normally this completion will happen on the channel's event loop anyway because the write op
// can only be completed by some network event from this point on. However...
final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
addListener(promise, listener);
assert assertCorrectPromiseListenerThreading(channel, promise);
channel.writeAndFlush(message, promise);
if (channel.eventLoop().isShuttingDown()) {
// ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that
// we cannot know if the preceding writeAndFlush made it onto its queue before shutdown or whether it will just vanish without a
// trace, so to avoid a leak we must double-check that the final listener is completed.
channel.eventLoop().terminationFuture().addListener(ignored ->
// NB the promise executor is ImmediateEventExecutor.INSTANCE which means this call to tryFailure() will ensure its completion,
// and the completion of any waiting listeners, without forking away from the current thread. The current thread might be the
// thread that was running the event loop since that's where the terminationFuture is completed, or it might be a thread which
// called (and is still calling) safeWriteAndFlush.
promise.tryFailure(new TransportException("Cannot send network message, event loop is shutting down.")));
}
}

private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future<?> promise) {
final var eventLoop = channel.eventLoop();
promise.addListener(future -> {
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated()
: future.cause();
});
return true;
}

/**
* Subscribes the given {@link ActionListener} to the given {@link Future}.
*/
public static void addListener(Future<Void> future, ActionListener<Void> listener) {
future.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Exception exception) {
listener.onFailure(exception);
} else {
listener.onFailure(new Exception(cause));
}
}
});
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/elasticsearch/http/HttpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public interface HttpChannel extends CloseableChannel {
* completed.
*
* @param response to send to channel
* @param listener to execute upon send completion
* @param listener to execute upon send completion. Note that this listener is usually completed on a network thread in a context in
* which there's a risk of stack overflows if on close it calls back into the network layer in a manner that might end
* up nesting too deeply. When in doubt, dispatch any further work onto a separate thread.
*/
void sendResponse(HttpResponse response, ActionListener<Void> listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ public void close() {
void addChunkLength(long chunkLength) {
assert chunkLength >= 0L : chunkLength;
assert Transports.assertTransportThread(); // always called on the transport worker, no need for sync
assert get() != null : "already closed";
responseLength += chunkLength;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
this.slowLogThresholdMs = slowLogThreshold.getMillis();
}

/**
* Send a raw message over the given channel.
*
* @param listener completed when the message has been sent, on the network thread (unless the network thread has shut down). Take care
* if calling back into the network layer from this listener without dispatching to a new thread since if we do that
* too many times in a row it can cause a stack overflow. When in doubt, dispatch any follow-up work onto a separate
* thread.
*/
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
internalSend(channel, bytes, null, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public boolean tryIncRef() {
return true;
}

/**
* {@inheritDoc}
*
* Note that the lifetime of an outbound {@link TransportMessage} lasts at least until it has been fully sent over the network, and it
* may be closed on a network thread in a context in which there's a risk of stack overflows if on close it calls back into the network
* layer in a manner that might end up nesting too deeply. When in doubt, dispatch any further work onto a separate thread.
*/
@Override
public boolean decRef() {
// noop, override to manage the life-cycle of resources held by a transport message
Expand Down