-
Notifications
You must be signed in to change notification settings - Fork 25.9k
Pausable chunked HTTP responses #104851
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pausable chunked HTTP responses #104851
Changes from all commits
c92a7ec
f7250b8
ff24f3c
e957bc7
44a4482
18e65e2
1fd16fb
33fa0cd
1d927ca
b0e3ff8
708fa52
4954c8e
aa52c18
43584be
2022d7a
ffd6cac
1777bc5
befb016
3fab7ce
bbcbf5a
7b896e0
0b91c7f
d5e4031
751ce75
cf74253
38d55af
048194d
6fac79d
64ad627
e4a34ac
b3b7399
d4d1401
221b5b6
b01bda4
39b71dd
23c1a1f
30795e0
4570c92
0d923f4
ee1a4a4
a2c385e
344f75c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,38 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
| * in compliance with, at your election, the Elastic License 2.0 or the Server | ||
| * Side Public License, v 1. | ||
| */ | ||
|
|
||
| package org.elasticsearch.http.netty4; | ||
|
|
||
| import io.netty.util.concurrent.PromiseCombiner; | ||
|
|
||
| import org.elasticsearch.rest.ChunkedRestResponseBody; | ||
|
|
||
| final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse { | ||
| private final int sequence; | ||
| private final ChunkedRestResponseBody body; | ||
| private final PromiseCombiner combiner; | ||
|
|
||
| Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) { | ||
| this.sequence = sequence; | ||
| this.body = body; | ||
| this.combiner = combiner; | ||
| } | ||
|
|
||
| @Override | ||
| public int getSequence() { | ||
| return sequence; | ||
| } | ||
|
|
||
| public ChunkedRestResponseBody body() { | ||
| return body; | ||
| } | ||
|
|
||
| public PromiseCombiner combiner() { | ||
| return combiner; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.core.Booleans; | ||
|
|
@@ -148,6 +149,8 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann | |
| } | ||
|
|
||
| private void enqueuePipelinedResponse(ChannelHandlerContext ctx, Netty4HttpResponse restResponse, ChannelPromise promise) { | ||
| assert restResponse instanceof Netty4ChunkedHttpContinuation == false | ||
| : "received out-of-order continuation at [" + restResponse.getSequence() + "], expecting [" + writeSequence + "]"; | ||
| assert restResponse.getSequence() > writeSequence | ||
| : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]"; | ||
| if (outboundHoldingQueue.size() >= maxEventsHeld) { | ||
|
|
@@ -187,6 +190,8 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse | |
| doWriteFullResponse(ctx, fullResponse, promise); | ||
| } else if (readyResponse instanceof Netty4ChunkedHttpResponse chunkedResponse) { | ||
| doWriteChunkedResponse(ctx, chunkedResponse, promise); | ||
| } else if (readyResponse instanceof Netty4ChunkedHttpContinuation chunkedContinuation) { | ||
| doWriteChunkedContinuation(ctx, chunkedContinuation, promise); | ||
| } else { | ||
| assert false : readyResponse.getClass().getCanonicalName(); | ||
| throw new IllegalStateException("illegal message type: " + readyResponse.getClass().getCanonicalName()); | ||
|
|
@@ -224,16 +229,75 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp | |
| } | ||
| } | ||
|
|
||
| private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise) { | ||
| final PromiseCombiner combiner = continuation.combiner(); | ||
| assert currentChunkedWrite == null; | ||
| final var responseBody = continuation.body(); | ||
| assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part"; | ||
| currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); | ||
| // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. | ||
| while (ctx.channel().isWritable()) { | ||
| if (writeChunk(ctx, currentChunkedWrite)) { | ||
| finishChunkedWrite(); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private void finishChunkedWrite() { | ||
| if (currentChunkedWrite == null) { | ||
| // failure during chunked response serialization, we're closing the channel | ||
| return; | ||
| } | ||
| assert currentChunkedWrite.responseBody().isDone(); | ||
| final var finishingWrite = currentChunkedWrite; | ||
| currentChunkedWrite = null; | ||
| writeSequence++; | ||
| finishingWrite.combiner().finish(finishingWrite.onDone()); | ||
| final var finishingWriteBody = finishingWrite.responseBody(); | ||
| assert finishingWriteBody.isDone(); | ||
| final var endOfResponse = finishingWriteBody.isEndOfResponse(); | ||
| if (endOfResponse) { | ||
| writeSequence++; | ||
| finishingWrite.combiner().finish(finishingWrite.onDone()); | ||
| } else { | ||
| final var channel = finishingWrite.onDone().channel(); | ||
| ActionListener.run(ActionListener.assertOnce(new ActionListener<>() { | ||
| @Override | ||
| public void onResponse(ChunkedRestResponseBody continuation) { | ||
| channel.writeAndFlush( | ||
| new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), | ||
| finishingWrite.onDone() // pass the terminal listener/promise along the line | ||
| ); | ||
|
ywangd marked this conversation as resolved.
Comment on lines
+265
to
+268
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder whether similar treatment (#105306) is needed here?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so, this code runs on the event loop already so these writes will still either complete or fail properly even if we're in the process of shutting down.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it always the case even when
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes you're quite right, thanks
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've applied a fix in 6fac79d but it had to be a little different from #105306 since we don't have a listener to wrap any more. But then I realised we could do the same thing at the outer layer too, and extract a utility, and in fact what we do today at the outer layer is also a little questionable too. I made the relevant changes in #105486 and will migrate to that utility once it's available here. |
||
| checkShutdown(); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Exception e) { | ||
| logger.error( | ||
| Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), | ||
| e | ||
| ); | ||
| channel.close().addListener(ignored -> { | ||
| finishingWrite.combiner().add(channel.newFailedFuture(e)); | ||
| finishingWrite.combiner().finish(finishingWrite.onDone()); | ||
|
Comment on lines
+279
to
+280
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly to the other PR, I wonder whether this could be
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather wait for the whole combiner to complete (and wonder whether we should do that in other places too). And |
||
| }); | ||
| checkShutdown(); | ||
| } | ||
|
|
||
| private void checkShutdown() { | ||
| if (channel.eventLoop().isShuttingDown()) { | ||
| // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the | ||
| // preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so | ||
| // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated. | ||
| // Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor | ||
| // which means this completion is not subject to the same issue, it still works even if the event loop has already | ||
| // terminated. | ||
| channel.eventLoop() | ||
| .terminationFuture() | ||
| .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException())); | ||
| } | ||
| } | ||
|
|
||
| }), finishingWriteBody::getContinuation); | ||
|
henningandersen marked this conversation as resolved.
|
||
| } | ||
| } | ||
|
|
||
| private void splitAndWrite(ChannelHandlerContext ctx, Netty4FullHttpResponse msg, ChannelPromise promise) { | ||
|
|
@@ -321,7 +385,8 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) | |
| } | ||
| final ByteBuf content = Netty4Utils.toByteBuf(bytes); | ||
| final boolean done = body.isDone(); | ||
| final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); | ||
| final boolean lastChunk = done && body.isEndOfResponse(); | ||
| final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); | ||
| f.addListener(ignored -> bytes.close()); | ||
| combiner.add(f); | ||
| return done; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,8 @@ | |
| package org.elasticsearch.rest; | ||
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.client.internal.Client; | ||
| import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
| import org.elasticsearch.common.io.stream.BytesStream; | ||
| import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; | ||
|
|
@@ -20,6 +22,8 @@ | |
| import org.elasticsearch.core.Streams; | ||
| import org.elasticsearch.logging.LogManager; | ||
| import org.elasticsearch.logging.Logger; | ||
| import org.elasticsearch.tasks.CancellableTask; | ||
| import org.elasticsearch.tasks.Task; | ||
| import org.elasticsearch.xcontent.ToXContent; | ||
| import org.elasticsearch.xcontent.XContentBuilder; | ||
|
|
||
|
|
@@ -31,18 +35,51 @@ | |
| import java.util.Iterator; | ||
|
|
||
| /** | ||
| * The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and | ||
| * instead serialize only as much of the response as can be flushed to the network right away. | ||
| * <p>A body (or a part thereof) of an HTTP response that uses the {@code chunked} transfer-encoding. This allows Elasticsearch to avoid | ||
| * materializing the full response into on-heap buffers up front, instead serializing only as much of the response as can be flushed to the | ||
| * network right away.</p> | ||
| * | ||
| * <p>Each {@link ChunkedRestResponseBody} represents a sequence of chunks that are ready for immediate transmission: if {@link #isDone} | ||
| * returns {@code false} then {@link #encodeChunk} can be called at any time and must synchronously return the next chunk to be sent. | ||
| * Many HTTP responses will be a single such sequence. However, if an implementation's {@link #isEndOfResponse} returns {@code false} at the | ||
| * end of the sequence then the transmission is paused and {@link #getContinuation} is called to compute the next sequence of chunks | ||
| * asynchronously.</p> | ||
| */ | ||
| public interface ChunkedRestResponseBody { | ||
|
|
||
| Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should change the name of this interface to I think the javadoc above could also be improved to mention that it can be multi-part.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ I will do this in a follow-up to avoid introducing too much noise here. I pushed 4570c92 to improve the Javadocs here tho. |
||
|
|
||
| /** | ||
| * @return true once this response has been written fully. | ||
| * @return {@code true} if this body contains no more chunks and the REST layer should check for a possible continuation by calling | ||
| * {@link #isEndOfResponse}, or {@code false} if the REST layer should request another chunk from this body using {@link #encodeChunk}. | ||
| */ | ||
| boolean isDone(); | ||
|
ywangd marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * @return {@code true} if this is the last chunked body in the response, or {@code false} if the REST layer should request further | ||
| * chunked bodies by calling {@link #getContinuation}. | ||
| */ | ||
| boolean isEndOfResponse(); | ||
|
|
||
| /** | ||
| * <p>Asynchronously retrieves the next part of the body. Called if {@link #isEndOfResponse} returns {@code false}.</p> | ||
| * | ||
| * <p>Note that this is called on a transport thread, so implementations must take care to dispatch any nontrivial work elsewhere.</p> | ||
|
|
||
| * <p>Note that the {@link Task} corresponding to any invocation of {@link Client#execute} completes as soon as the client action | ||
| * returns its response, so it no longer exists when this method is called and cannot be used to receive cancellation notifications. | ||
| * Instead, if the HTTP channel is closed while sending a response then the REST layer will invoke {@link RestResponse#close}. If the | ||
| * HTTP channel is closed while the REST layer is waiting for a continuation then the {@link RestResponse} will not be closed until the | ||
| * continuation listener is completed. Implementations will typically explicitly create a {@link CancellableTask} to represent the | ||
| * computation and transmission of the entire {@link RestResponse}, and will cancel this task if the {@link RestResponse} is closed | ||
| * prematurely.</p> | ||
| * | ||
| * @param listener Listener to complete with the next part of the body. By the point this is called we have already started to send | ||
| * the body of the response, so there's no good ways to handle an exception here. Completing the listener exceptionally | ||
| * will log an error, abort sending the response, and close the HTTP connection. | ||
| */ | ||
| void getContinuation(ActionListener<ChunkedRestResponseBody> listener); | ||
|
nik9000 marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * Serializes approximately as many bytes of the response as request by {@code sizeHint} to a {@link ReleasableBytesReference} that | ||
| * is created from buffers backed by the given {@code recycler}. | ||
|
|
@@ -102,6 +139,17 @@ public boolean isDone() { | |
| return serialization.hasNext() == false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEndOfResponse() { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) { | ||
| assert false : "no continuations"; | ||
|
ywangd marked this conversation as resolved.
|
||
| listener.onFailure(new IllegalStateException("no continuations available")); | ||
| } | ||
|
|
||
| @Override | ||
| public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException { | ||
| try { | ||
|
|
@@ -180,6 +228,17 @@ public boolean isDone() { | |
| return chunkIterator.hasNext() == false; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEndOfResponse() { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) { | ||
| assert false : "no continuations"; | ||
|
ywangd marked this conversation as resolved.
|
||
| listener.onFailure(new IllegalStateException("no continuations available")); | ||
| } | ||
|
|
||
| @Override | ||
| public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> recycler) throws IOException { | ||
| try { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.