Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/105486.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 105486
summary: Fix use-after-free at event-loop shutdown
area: Network
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,26 @@
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;
import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush;

public class Netty4HttpChannel implements HttpChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();

Netty4HttpChannel(Channel channel) {
this.channel = channel;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
addListener(this.channel.closeFuture(), closeContext);
}

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
// We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to
// use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved.
var wrapped = ActionListener.notifyOnce(listener);
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(wrapped, channel));
if (channel.eventLoop().isShutdown()) {
wrapped.onFailure(new TransportException("Cannot send HTTP response, event loop is shutting down."));
}
Comment on lines -37 to -41
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being slow on this issue. Could you please help me some more to understand what the issue was with the previous solution? You said

We could still be manipulating a network message when the event loop shuts down, causing us to close the message while it's still in use.

What do you mean by "manipulating a network message"? Is it releasing the response (or "close the message")? I fail to see how we can "close the message while it's still in use" given the listener is notifyOnce?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that isShutdown() returns true while the event loop is still running its last few tasks, and one of those tasks might still be doing something with the message while we release it. To be sure we're releasing the message after all possible uses on the event loop we need to subscribe to the terminationFuture instead.

Replacing the releaseOnce with Promise#tryFailure is not strictly necessary to fix this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK. That makes sense. I am curious how you found out about it. Did you just read into the Netty source code?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, your review comment here prompted me to look a little deeper at the shutdown behaviour which made me realise we were doing it wrong in these places too.

safeWriteAndFlush(channel, response, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;

import java.net.InetSocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;

public class Netty4HttpServerChannel implements HttpServerChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();

Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
addListener(this.channel.closeFuture(), closeContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,19 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;

import java.net.InetSocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;
import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush;

public class Netty4TcpChannel implements TcpChannel {

private final Channel channel;
Expand All @@ -44,52 +44,6 @@ public class Netty4TcpChannel implements TcpChannel {
addListener(connectFuture, connectContext);
}

/**
* Adds a listener that completes the given {@link ListenableFuture} to the given {@link ChannelFuture}.
* @param channelFuture Channel future
* @param listener Listener to complete
*/
public static void addListener(ChannelFuture channelFuture, ListenableFuture<Void> listener) {
channelFuture.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
}

/**
* Creates a {@link ChannelPromise} for the given {@link Channel} and adds a listener that invokes the given {@link ActionListener}
* on its completion.
* @param listener lister to invoke
* @param channel channel
* @return write promise
*/
public static ChannelPromise addPromise(ActionListener<Void> listener, Channel channel) {
ChannelPromise writePromise = channel.newPromise();
writePromise.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Error) {
listener.onFailure(new Exception(cause));
} else {
listener.onFailure((Exception) cause);
}
}
});
return writePromise;
}

@Override
public void close() {
if (rstOnClose) {
Expand Down Expand Up @@ -162,13 +116,7 @@ public InetSocketAddress getRemoteAddress() {

@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
// We need to both guard against double resolving the listener and not resolving it in case of event loop shutdown so we need to
// use #notifyOnce here until https://github.com/netty/netty/issues/8007 is resolved.
var wrapped = ActionListener.notifyOnce(listener);
channel.writeAndFlush(reference, addPromise(wrapped, channel));
if (channel.eventLoop().isShutdown()) {
wrapped.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
safeWriteAndFlush(channel, reference, listener);
}

public Channel getNettyChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

import java.net.InetSocketAddress;

import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;

public class Netty4TcpServerChannel implements TcpServerChannel {

private final Channel channel;
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();

Netty4TcpServerChannel(Channel channel) {
this.channel = channel;
Netty4TcpChannel.addListener(this.channel.closeFuture(), closeContext);
addListener(this.channel.closeFuture(), closeContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,30 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

public class Netty4Utils {
Expand Down Expand Up @@ -121,4 +129,57 @@ public static Recycler<BytesRef> createRecycler(Settings settings) {
setAvailableProcessors(EsExecutors.allocatedProcessors(settings));
return NettyAllocator.getRecycler();
}

/**
* Calls {@link Channel#writeAndFlush} to write the given message to the given channel, but ensures that the listener is completed even
* if the event loop is concurrently shutting down since Netty does not offer this guarantee.
*/
public static void safeWriteAndFlush(Channel channel, Object message, ActionListener<Void> listener) {
// Use ImmediateEventExecutor.INSTANCE since we want to be able to complete this promise, and any waiting listeners, even if the
// channel's event loop has shut down. Normally this completion will happen on the channel's event loop anyway because the write op
// can only be completed by some network event from this point on. However...
final var promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this does fundamentally change how things work doesn't it? Normally, Netty will queue the result handling of the operation on its task queue, now this gets executed inline with the write. I have a very hard time understanding the precise effects of this but it makes some sense to assume that it will make IO event handling less predictable?
Also, I wonder if this even provides a full fix? We use this thing in a couple spots: new PromiseCombiner(ctx.executor()), aren't we still subject to the same short-comings just less likely to run into this now? The point of ctx.executor() executor isn't really forking but rather queuing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, Netty will queue the result handling of the operation on its task queue, now this gets executed inline with the write.

I don't think that's true. If we're 8+ levels deep then it'll do that, but we don't seem to nest these things that much. The rest of the time if you complete the promise on the event loop (which is how we do it pretty much everywhere anyway) then it executes the listeners inline with the write.

If I can't convince you that stack overflows aren't a big deal here, we could reasonably make safeWriteAndFlush fork its listener onto a proper shutdown-sensitive threadpool. That still seems better than what we have today.

We use this thing in a couple spots: new PromiseCombiner(ctx.executor()), aren't we still subject to the same short-comings

I don't think so, all those spots don't look like they'll do anything outside the event loop anyway. We're not relying on all those subsidiary promises completing if we have the protection at the outer level.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm you're right this should actually be ok because all our deep nesting from the combiner is on the event loop indeed so we only really add one more extra level worst case :) Sorry slow to wake up today :)

addListener(promise, listener);
assert assertCorrectPromiseListenerThreading(channel, promise);
channel.writeAndFlush(message, promise);
if (channel.eventLoop().isShuttingDown()) {
// ... if we get here then the event loop may already have terminated, and https://github.com/netty/netty/issues/8007 means that
// we cannot know if the preceding writeAndFlush made it onto its queue before shutdown or whether it will just vanish without a
// trace, so to avoid a leak we must double-check that the final listener is completed.
channel.eventLoop().terminationFuture().addListener(ignored ->
// NB the promise executor is ImmediateEventExecutor.INSTANCE which means this call to tryFailure() will ensure its completion,
// and the completion of any waiting listeners, without forking away from the current thread. The current thread might be the
// thread that was running the event loop since that's where the terminationFuture is completed, or it might be a thread which
// called (and is still calling) safeWriteAndFlush.
promise.tryFailure(new TransportException("Cannot send network message, event loop is shutting down.")));
}
}

private static boolean assertCorrectPromiseListenerThreading(Channel channel, Future<?> promise) {
final var eventLoop = channel.eventLoop();
promise.addListener(future -> {
assert eventLoop.inEventLoop() || future.cause() instanceof RejectedExecutionException || channel.eventLoop().isTerminated()
: future.cause();
});
return true;
}

/**
* Subscribes the given {@link ActionListener} to the given {@link Future}.
*/
public static void addListener(Future<Void> future, ActionListener<Void> listener) {
future.addListener(f -> {
if (f.isSuccess()) {
listener.onResponse(null);
} else {
final Throwable cause = f.cause();
ExceptionsHelper.maybeDieOnAnotherThread(cause);
if (cause instanceof Exception exception) {
listener.onFailure(exception);
} else {
listener.onFailure(new Exception(cause));
}
}
});
}
}
4 changes: 3 additions & 1 deletion server/src/main/java/org/elasticsearch/http/HttpChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ public interface HttpChannel extends CloseableChannel {
* completed.
*
* @param response to send to channel
* @param listener to execute upon send completion
* @param listener to execute upon send completion. Note that this listener is usually completed on a network thread in a context in
* which there's a risk of stack overflows if on close it calls back into the network layer in a manner that might end
* up nesting too deeply. When in doubt, dispatch any further work onto a separate thread.
*/
void sendResponse(HttpResponse response, ActionListener<Void> listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ public void close() {
void addChunkLength(long chunkLength) {
assert chunkLength >= 0L : chunkLength;
assert Transports.assertTransportThread(); // always called on the transport worker, no need for sync
assert get() != null : "already closed";
responseLength += chunkLength;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ void setSlowLogThreshold(TimeValue slowLogThreshold) {
this.slowLogThresholdMs = slowLogThreshold.getMillis();
}

/**
* Send a raw message over the given channel.
*
* @param listener completed when the message has been sent, on the network thread (unless the network thread has shut down). Take care
* if calling back into the network layer from this listener without dispatching to a new thread since if we do that
* too many times in a row it can cause a stack overflow. When in doubt, dispatch any follow-up work onto a separate
* thread.
*/
void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
internalSend(channel, bytes, null, listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ public boolean tryIncRef() {
return true;
}

/**
* {@inheritDoc}
*
* Note that the lifetime of an outbound {@link TransportMessage} lasts at least until it has been fully sent over the network, and it
* may be closed on a network thread in a context in which there's a risk of stack overflows if on close it calls back into the network
* layer in a manner that might end up nesting too deeply. When in doubt, dispatch any further work onto a separate thread.
*/
@Override
public boolean decRef() {
// noop, override to manage the life-cycle of resources held by a transport message
Expand Down