diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 02e5139a0e3aa..6b4931a6b60e3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -11,6 +11,7 @@ import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.concurrent.FutureListener; import org.elasticsearch.http.HttpBody; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -33,9 +34,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private boolean closing = false; private HttpBody.ChunkHandler handler; + private final FutureListener closeListener = future -> doClose(); + public Netty4HttpRequestBodyStream(Channel channel) { this.channel = channel; - channel.closeFuture().addListener((f) -> doClose()); + channel.closeFuture().addListener(closeListener); channel.config().setAutoRead(false); } @@ -71,6 +74,7 @@ public void next() { } public void handleNettyContent(HttpContent httpContent) { + hasLast = httpContent instanceof LastHttpContent; if (closing) { httpContent.release(); return; @@ -81,10 +85,6 @@ public void handleNettyContent(HttpContent httpContent) { } else { chunkQueue.add(httpContent); } - if (httpContent instanceof LastHttpContent) { - hasLast = true; - channel.config().setAutoRead(true); - } } // visible for test @@ -108,6 +108,10 @@ private void sendChunk(HttpContent httpContent) { var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content()); var isLast = httpContent instanceof LastHttpContent; handler.onNext(bytesRef, isLast); + if (isLast) { + channel.config().setAutoRead(true); + channel.closeFuture().removeListener(closeListener); + } } private void releaseQueuedChunks() {