Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/105293.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 105293
summary: Fix leaked HTTP response sent after close
area: Network
type: bug
issues:
- 104651
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;

public class Netty4HttpChannel implements HttpChannel {

Expand All @@ -31,7 +32,13 @@ public class Netty4HttpChannel implements HttpChannel {

@Override
public void sendResponse(HttpResponse response, ActionListener<Void> listener) {
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
if (isOpen()) {
channel.writeAndFlush(response, Netty4TcpChannel.addPromise(listener, channel));
} else {
// No need to dispatch to the event loop just to fail this listener; moreover the channel might be closed because the whole
// node is shutting down, in which case the event loop might not exist any more so the channel promise cannot be completed.
listener.onFailure(new ClosedChannelException());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;

import org.apache.http.HttpHost;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchWrapperException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.network.NetworkAddress;
Expand Down Expand Up @@ -956,6 +962,58 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
}
}

public void testRespondAfterClose() throws Exception {
final String url = "/thing";
final CountDownLatch responseReleasedLatch = new CountDownLatch(1);
final SubscribableListener<Void> transportClosedFuture = new SubscribableListener<>();
final CountDownLatch handlingRequestLatch = new CountDownLatch(1);

final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
assertEquals(request.uri(), url);
final var response = RestResponse.chunked(
OK,
ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()),
responseReleasedLatch::countDown
);
transportClosedFuture.addListener(ActionListener.running(() -> channel.sendResponse(response)));
handlingRequestLatch.countDown();
}

@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
fail(cause, "--> Unexpected bad request [%s]", FakeRestRequest.requestToString(channel.request()));
}
};

try (
Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
Settings.EMPTY,
networkService,
threadPool,
xContentRegistry(),
dispatcher,
clusterSettings,
new SharedGroupFactory(Settings.EMPTY),
Tracer.NOOP,
TLSConfig.noTLS(),
null,
randomFrom((httpPreRequest, channel, listener) -> listener.onResponse(null), null)
)
) {
transport.start();
final var address = randomFrom(transport.boundAddress().boundAddresses()).address();
try (var client = RestClient.builder(new HttpHost(address.getAddress(), address.getPort())).build()) {
client.performRequestAsync(new Request("GET", url), ActionTestUtils.wrapAsRestResponseListener(ActionListener.noop()));
safeAwait(handlingRequestLatch);
transport.close();
transportClosedFuture.onResponse(null);
safeAwait(responseReleasedLatch);
}
}
}

private Netty4HttpServerTransport getTestNetty4HttpServerTransport(
HttpServerTransport.Dispatcher dispatcher,
HttpValidator httpValidator,
Expand Down