From 6e00966f26771157baf178e8f1ff66c7c23fa4f0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 25 May 2024 13:27:22 +0100 Subject: [PATCH 1/3] Rename to `ChunkedRestResponseBodyPart` Follow-up to #104851 to rename some symbols to reflect that the class formerly known as a `ChunkedRestResponseBody` may now only be _part_ of the whole response body. --- .../netty4/Netty4ChunkedContinuationsIT.java | 37 ++++++------ .../http/netty4/Netty4ChunkedEncodingIT.java | 10 ++-- .../http/netty4/Netty4PipeliningIT.java | 10 ++-- .../netty4/Netty4ChunkedHttpContinuation.java | 12 ++-- .../netty4/Netty4ChunkedHttpResponse.java | 12 ++-- .../netty4/Netty4HttpPipeliningHandler.java | 39 ++++++------- .../http/netty4/Netty4HttpRequest.java | 6 +- .../Netty4HttpPipeliningHandlerTests.java | 12 ++-- .../Netty4HttpServerTransportTests.java | 6 +- .../elasticsearch/rest/RestControllerIT.java | 2 +- .../http/DefaultRestChannel.java | 8 +-- .../org/elasticsearch/http/HttpRequest.java | 4 +- ....java => ChunkedRestResponseBodyPart.java} | 56 +++++++++---------- ...> LoggingChunkedRestResponseBodyPart.java} | 18 +++--- .../elasticsearch/rest/RestController.java | 26 ++++----- .../org/elasticsearch/rest/RestResponse.java | 12 ++-- .../action/RestChunkedToXContentListener.java | 4 +- .../cluster/RestNodesHotThreadsAction.java | 2 +- .../rest/action/cat/RestTable.java | 6 +- .../action/info/RestClusterInfoAction.java | 4 +- .../http/DefaultRestChannelTests.java | 42 +++++++------- .../elasticsearch/http/TestHttpRequest.java | 4 +- ... => ChunkedRestResponseBodyPartTests.java} | 19 ++++--- .../rest/RestControllerTests.java | 2 +- .../elasticsearch/rest/RestResponseTests.java | 2 +- .../rest/action/cat/RestTableTests.java | 7 ++- .../elasticsearch/rest/RestResponseUtils.java | 10 ++-- .../test/rest/FakeRestRequest.java | 4 +- .../esql/action/EsqlResponseListener.java | 6 +- 29 files changed, 195 insertions(+), 187 deletions(-) rename server/src/main/java/org/elasticsearch/rest/{ChunkedRestResponseBody.java => ChunkedRestResponseBodyPart.java} (79%) rename server/src/main/java/org/elasticsearch/rest/{LoggingChunkedRestResponseBody.java => LoggingChunkedRestResponseBodyPart.java} (68%) rename server/src/test/java/org/elasticsearch/rest/{ChunkedRestResponseBodyTests.java => ChunkedRestResponseBodyPartTests.java} (81%) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 25195a1176fb8..77333677120a9 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -60,7 +60,7 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -370,31 +370,31 @@ public void writeTo(StreamOutput out) { TransportAction.localOnly(); } - public ChunkedRestResponseBody getChunkedBody() { - return getChunkBatch(0); + public ChunkedRestResponseBodyPart getFirstResponseBodyPart() { + return getResponseBodyPart(0); } - private ChunkedRestResponseBody getChunkBatch(int batchIndex) { + private ChunkedRestResponseBodyPart getResponseBodyPart(int batchIndex) { if (batchIndex == failIndex && randomBoolean()) { throw new ElasticsearchException("simulated failure creating next batch"); } - return new ChunkedRestResponseBody() { + return new ChunkedRestResponseBodyPart() { private final Iterator lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n"); @Override - public boolean isDone() { + public boolean isPartComplete() { return lines.hasNext() == false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return batchIndex == 2; } @Override - public void getContinuation(ActionListener listener) { - executor.execute(ActionRunnable.supply(listener, () -> getChunkBatch(batchIndex + 1))); + public void getNextPart(ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, () -> getResponseBodyPart(batchIndex + 1))); } @Override @@ -486,11 +486,12 @@ public void accept(RestChannel channel) { @Override protected void processResponse(Response response) { try { - final var responseBody = response.getChunkedBody(); // might fail, so do this before acquiring ref + final var responseBody = response.getFirstResponseBodyPart(); + // preceding line might fail, so needs to be done before acquiring the sendResponse ref refs.mustIncRef(); channel.sendResponse(RestResponse.chunked(RestStatus.OK, responseBody, refs::decRef)); } finally { - refs.decRef(); + refs.decRef(); // release the ref acquired at the top of accept() } } }); @@ -534,26 +535,26 @@ public void writeTo(StreamOutput out) { TransportAction.localOnly(); } - public ChunkedRestResponseBody getChunkedBody() { - return new ChunkedRestResponseBody() { + public ChunkedRestResponseBodyPart getResponseBodyPart() { + return new ChunkedRestResponseBodyPart() { private final Iterator lines = Iterators.single("infinite response\n"); @Override - public boolean isDone() { + public boolean isPartComplete() { return lines.hasNext() == false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return false; } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { computingContinuation = true; executor.execute(ActionRunnable.supply(listener, () -> { computingContinuation = false; - return getChunkedBody(); + return getResponseBodyPart(); })); } @@ -628,7 +629,7 @@ public void accept(RestChannel channel) { client.execute(TYPE, new Request(), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { - channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), () -> { + channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getResponseBodyPart(), () -> { // cancellation notification only happens while processing a continuation, not while computing // the next one; prompt cancellation requires use of something like RestCancellableNodeClient assertFalse(response.computingContinuation); diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java index b2a54e2027308..e3f60ea7a48e0 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java @@ -37,7 +37,7 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -245,19 +245,19 @@ public BytesReference next() { private static void sendChunksResponse(RestChannel channel, Iterator chunkIterator) { final var localRefs = refs; // single volatile read if (localRefs != null && localRefs.tryIncRef()) { - channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() { + channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() { @Override - public boolean isDone() { + public boolean isPartComplete() { return chunkIterator.hasNext() == false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return true; } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { assert false : "no continuations"; } diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java index ce8da0c08af54..89a76dd26e285 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java @@ -34,7 +34,7 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestRequest; @@ -243,21 +243,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli throw new IllegalArgumentException("[" + FAIL_AFTER_BYTES_PARAM + "] must be present and non-negative"); } return channel -> randomExecutor(client.threadPool()).execute( - () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() { + () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() { int bytesRemaining = failAfterBytes; @Override - public boolean isDone() { + public boolean isPartComplete() { return false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return true; } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { fail("no continuations here"); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java index 156f1c27aa67c..cde0249216981 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java @@ -10,16 +10,16 @@ import io.netty.util.concurrent.PromiseCombiner; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse { private final int sequence; - private final ChunkedRestResponseBody body; + private final ChunkedRestResponseBodyPart bodyPart; private final PromiseCombiner combiner; - Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) { + Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBodyPart bodyPart, PromiseCombiner combiner) { this.sequence = sequence; - this.body = body; + this.bodyPart = bodyPart; this.combiner = combiner; } @@ -28,8 +28,8 @@ public int getSequence() { return sequence; } - public ChunkedRestResponseBody body() { - return body; + public ChunkedRestResponseBodyPart bodyPart() { + return bodyPart; } public PromiseCombiner combiner() { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java index 783c02da0bbcc..3abab9fa2526f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java @@ -13,7 +13,7 @@ import io.netty.handler.codec.http.HttpVersion; import org.elasticsearch.http.HttpResponse; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestStatus; /** @@ -23,16 +23,16 @@ final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Net private final int sequence; - private final ChunkedRestResponseBody body; + private final ChunkedRestResponseBodyPart firstBodyPart; - Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) { + Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { super(version, HttpResponseStatus.valueOf(status.getStatus())); this.sequence = sequence; - this.body = body; + this.firstBodyPart = firstBodyPart; } - public ChunkedRestResponseBody body() { - return body; + public ChunkedRestResponseBodyPart firstBodyPart() { + return firstBodyPart; } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 8280c438613a2..9cf210c2a8aab 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -34,7 +34,7 @@ import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler; @@ -58,7 +58,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { private final int maxEventsHeld; private final PriorityQueue> outboundHoldingQueue; - private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {} + private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBodyPart responseBodyPart) {} /** * The current {@link ChunkedWrite} if a chunked write is executed at the moment. @@ -214,9 +214,9 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp final PromiseCombiner combiner = new PromiseCombiner(ctx.executor()); final ChannelPromise first = ctx.newPromise(); combiner.add((Future) first); - final var responseBody = readyResponse.body(); + final var firstBodyPart = readyResponse.firstBodyPart(); assert currentChunkedWrite == null; - currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); + currentChunkedWrite = new ChunkedWrite(combiner, promise, firstBodyPart); if (enqueueWrite(ctx, readyResponse, first)) { // We were able to write out the first chunk directly, try writing out subsequent chunks until the channel becomes unwritable. // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. @@ -232,9 +232,10 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise) { final PromiseCombiner combiner = continuation.combiner(); assert currentChunkedWrite == null; - final var responseBody = continuation.body(); - assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part"; - currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); + final var bodyPart = continuation.bodyPart(); + assert bodyPart.isPartComplete() == false + : "response with continuations must have at least one (possibly-empty) chunk in each part"; + currentChunkedWrite = new ChunkedWrite(combiner, promise, bodyPart); // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. while (ctx.channel().isWritable()) { if (writeChunk(ctx, currentChunkedWrite)) { @@ -251,9 +252,9 @@ private void finishChunkedWrite() { } final var finishingWrite = currentChunkedWrite; currentChunkedWrite = null; - final var finishingWriteBody = finishingWrite.responseBody(); - assert finishingWriteBody.isDone(); - final var endOfResponse = finishingWriteBody.isEndOfResponse(); + final var finishingWriteBodyPart = finishingWrite.responseBodyPart(); + assert finishingWriteBodyPart.isPartComplete(); + final var endOfResponse = finishingWriteBodyPart.isLastPart(); if (endOfResponse) { writeSequence++; finishingWrite.combiner().finish(finishingWrite.onDone()); @@ -261,7 +262,7 @@ private void finishChunkedWrite() { final var channel = finishingWrite.onDone().channel(); ActionListener.run(ActionListener.assertOnce(new ActionListener<>() { @Override - public void onResponse(ChunkedRestResponseBody continuation) { + public void onResponse(ChunkedRestResponseBodyPart continuation) { channel.writeAndFlush( new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), finishingWrite.onDone() // pass the terminal listener/promise along the line @@ -296,7 +297,7 @@ private void checkShutdown() { } } - }), finishingWriteBody::getContinuation); + }), finishingWriteBodyPart::getNextPart); } } @@ -374,22 +375,22 @@ private boolean doFlush(ChannelHandlerContext ctx) throws IOException { } private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) { - final var body = chunkedWrite.responseBody(); + final var bodyPart = chunkedWrite.responseBodyPart(); final var combiner = chunkedWrite.combiner(); - assert body.isDone() == false : "should not continue to try and serialize once done"; + assert bodyPart.isPartComplete() == false : "should not continue to try and serialize once done"; final ReleasableBytesReference bytes; try { - bytes = body.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler()); + bytes = bodyPart.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler()); } catch (Exception e) { return handleChunkingFailure(ctx, chunkedWrite, e); } final ByteBuf content = Netty4Utils.toByteBuf(bytes); - final boolean done = body.isDone(); - final boolean lastChunk = done && body.isEndOfResponse(); - final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); + final boolean isPartComplete = bodyPart.isPartComplete(); + final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart(); + final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); f.addListener(ignored -> bytes.close()); combiner.add(f); - return done; + return isPartComplete; } private boolean handleChunkingFailure(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite, Exception e) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 0e1bb527fed9d..1e35f084c87ec 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -22,7 +22,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -176,8 +176,8 @@ public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference c } @Override - public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { - return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, content); + public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { + return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index bb4a0939c98f0..4dca3d17bf072 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -36,7 +36,7 @@ import org.elasticsearch.common.bytes.ZeroBytesReference; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.http.HttpResponse; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -502,23 +502,23 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque }; } - private static ChunkedRestResponseBody getRepeatedChunkResponseBody(int chunkCount, BytesReference chunk) { - return new ChunkedRestResponseBody() { + private static ChunkedRestResponseBodyPart getRepeatedChunkResponseBody(int chunkCount, BytesReference chunk) { + return new ChunkedRestResponseBodyPart() { private int remaining = chunkCount; @Override - public boolean isDone() { + public boolean isPartComplete() { return remaining == 0; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return true; } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { fail("no continuations here"); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index d2be4212cf41e..bc6e5fef834e8 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -71,7 +71,7 @@ import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; import org.elasticsearch.http.netty4.internal.HttpValidator; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; @@ -692,7 +692,7 @@ public void testHeadRequestToChunkedApi() throws InterruptedException { public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) { try { channel.sendResponse( - RestResponse.chunked(OK, ChunkedRestResponseBody.fromXContent(ignored -> Iterators.single((builder, params) -> { + RestResponse.chunked(OK, ChunkedRestResponseBodyPart.fromXContent(ignored -> Iterators.single((builder, params) -> { throw new AssertionError("should not be called for HEAD REQUEST"); }), ToXContent.EMPTY_PARAMS, channel), null) ); @@ -1048,7 +1048,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel assertEquals(request.uri(), url); final var response = RestResponse.chunked( OK, - ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()), + ChunkedRestResponseBodyPart.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()), responseReleasedLatch::countDown ); transportClosedFuture.addListener(ActionListener.running(() -> channel.sendResponse(response))); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/rest/RestControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/rest/RestControllerIT.java index 809ecbc858706..b76bec0652732 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/rest/RestControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/rest/RestControllerIT.java @@ -82,7 +82,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return channel -> { final var response = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromXContent( + ChunkedRestResponseBodyPart.fromXContent( params -> Iterators.single((b, p) -> b.startObject().endObject()), request, channel diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 9719716c57ce4..f04b8f13bfe7e 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -21,8 +21,8 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.rest.AbstractRestChannel; -import org.elasticsearch.rest.ChunkedRestResponseBody; -import org.elasticsearch.rest.LoggingChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; +import org.elasticsearch.rest.LoggingChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; @@ -113,7 +113,7 @@ public void sendResponse(RestResponse restResponse) { try { final HttpResponse httpResponse; if (isHeadRequest == false && restResponse.isChunked()) { - ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent(); + ChunkedRestResponseBodyPart chunkedContent = restResponse.chunkedContent(); if (httpLogger != null && httpLogger.isBodyTracerEnabled()) { final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId()); toClose.add(() -> { @@ -123,7 +123,7 @@ public void sendResponse(RestResponse restResponse) { assert false : e; // nothing much to go wrong here } }); - chunkedContent = new LoggingChunkedRestResponseBody(chunkedContent, loggerStream); + chunkedContent = new LoggingChunkedRestResponseBodyPart(chunkedContent, loggerStream); } httpResponse = httpRequest.createResponse(restResponse.status(), chunkedContent); diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index b82947e42308b..2757fa15ce477 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -10,7 +10,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Nullable; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestStatus; import java.util.List; @@ -40,7 +40,7 @@ enum HttpVersion { */ HttpResponse createResponse(RestStatus status, BytesReference content); - HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content); + HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart); @Nullable Exception getInboundException(); diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java similarity index 79% rename from server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java rename to server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java index 2f7fc458ca020..4888b59f19561 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java @@ -39,32 +39,32 @@ * materializing the full response into on-heap buffers up front, instead serializing only as much of the response as can be flushed to the * network right away.

* - *

Each {@link ChunkedRestResponseBody} represents a sequence of chunks that are ready for immediate transmission: if {@link #isDone} - * returns {@code false} then {@link #encodeChunk} can be called at any time and must synchronously return the next chunk to be sent. - * Many HTTP responses will be a single such sequence. However, if an implementation's {@link #isEndOfResponse} returns {@code false} at the - * end of the sequence then the transmission is paused and {@link #getContinuation} is called to compute the next sequence of chunks + *

Each {@link ChunkedRestResponseBodyPart} represents a sequence of chunks that are ready for immediate transmission: if + * {@link #isPartComplete} returns {@code false} then {@link #encodeChunk} can be called at any time and must synchronously return the next + * chunk to be sent. Many HTTP responses will be a single part, but if an implementation's {@link #isLastPart} returns {@code false} at the + * end of the part then the transmission is paused and {@link #getNextPart} is called to compute the next sequence of chunks * asynchronously.

*/ -public interface ChunkedRestResponseBody { +public interface ChunkedRestResponseBodyPart { - Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class); + Logger logger = LogManager.getLogger(ChunkedRestResponseBodyPart.class); /** - * @return {@code true} if this body contains no more chunks and the REST layer should check for a possible continuation by calling - * {@link #isEndOfResponse}, or {@code false} if the REST layer should request another chunk from this body using {@link #encodeChunk}. + * @return {@code true} if this body part contains no more chunks and the REST layer should check for a possible continuation by calling + * {@link #isLastPart}, or {@code false} if the REST layer should request another chunk from this body using {@link #encodeChunk}. */ - boolean isDone(); + boolean isPartComplete(); /** - * @return {@code true} if this is the last chunked body in the response, or {@code false} if the REST layer should request further - * chunked bodies by calling {@link #getContinuation}. + * @return {@code true} if this is the last chunked body part in the response, or {@code false} if the REST layer should request further + * chunked bodies by calling {@link #getNextPart}. */ - boolean isEndOfResponse(); + boolean isLastPart(); /** - *

Asynchronously retrieves the next part of the body. Called if {@link #isEndOfResponse} returns {@code false}.

+ *

Asynchronously retrieves the next part of the response body. Called if {@link #isLastPart} returns {@code false}.

* - *

Note that this is called on a transport thread, so implementations must take care to dispatch any nontrivial work elsewhere.

+ *

Note that this is called on a transport thread: implementations must take care to dispatch any nontrivial work elsewhere.

*

Note that the {@link Task} corresponding to any invocation of {@link Client#execute} completes as soon as the client action * returns its response, so it no longer exists when this method is called and cannot be used to receive cancellation notifications. @@ -78,7 +78,7 @@ public interface ChunkedRestResponseBody { * the body of the response, so there's no good ways to handle an exception here. Completing the listener exceptionally * will log an error, abort sending the response, and close the HTTP connection. */ - void getContinuation(ActionListener listener); + void getNextPart(ActionListener listener); /** * Serializes approximately as many bytes of the response as request by {@code sizeHint} to a {@link ReleasableBytesReference} that @@ -97,17 +97,17 @@ public interface ChunkedRestResponseBody { String getResponseContentTypeString(); /** - * Create a chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}. + * Create a one-part chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}. * * @param chunkedToXContent chunked x-content instance to serialize * @param params parameters to use for serialization * @param channel channel the response will be written to * @return chunked rest response body */ - static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel) + static ChunkedRestResponseBodyPart fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel) throws IOException { - return new ChunkedRestResponseBody() { + return new ChunkedRestResponseBodyPart() { private final OutputStream out = new OutputStream() { @Override @@ -135,17 +135,17 @@ public void write(byte[] b, int off, int len) throws IOException { private BytesStream target; @Override - public boolean isDone() { + public boolean isPartComplete() { return serialization.hasNext() == false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return true; } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { assert false : "no continuations"; listener.onFailure(new IllegalStateException("no continuations available")); } @@ -191,11 +191,11 @@ public String getResponseContentTypeString() { } /** - * Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a - * consumer of a {@link Writer}. + * Create a one-part chunked response body to be written to a specific {@link RestChannel} from a stream of UTF-8-encoded text chunks, + * each represented as a consumer of a {@link Writer}. */ - static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator> chunkIterator) { - return new ChunkedRestResponseBody() { + static ChunkedRestResponseBodyPart fromTextChunks(String contentType, Iterator> chunkIterator) { + return new ChunkedRestResponseBodyPart() { private RecyclerBytesStreamOutput currentOutput; private final Writer writer = new OutputStreamWriter(new OutputStream() { @Override @@ -224,17 +224,17 @@ public void close() { }, StandardCharsets.UTF_8); @Override - public boolean isDone() { + public boolean isPartComplete() { return chunkIterator.hasNext() == false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return true; } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { assert false : "no continuations"; listener.onFailure(new IllegalStateException("no continuations available")); } diff --git a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBodyPart.java similarity index 68% rename from server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java rename to server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBodyPart.java index 865f433e25aa4..f7a018eaacf7e 100644 --- a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBodyPart.java @@ -16,29 +16,29 @@ import java.io.IOException; import java.io.OutputStream; -public class LoggingChunkedRestResponseBody implements ChunkedRestResponseBody { +public class LoggingChunkedRestResponseBodyPart implements ChunkedRestResponseBodyPart { - private final ChunkedRestResponseBody inner; + private final ChunkedRestResponseBodyPart inner; private final OutputStream loggerStream; - public LoggingChunkedRestResponseBody(ChunkedRestResponseBody inner, OutputStream loggerStream) { + public LoggingChunkedRestResponseBodyPart(ChunkedRestResponseBodyPart inner, OutputStream loggerStream) { this.inner = inner; this.loggerStream = loggerStream; } @Override - public boolean isDone() { - return inner.isDone(); + public boolean isPartComplete() { + return inner.isPartComplete(); } @Override - public boolean isEndOfResponse() { - return inner.isEndOfResponse(); + public boolean isLastPart() { + return inner.isLastPart(); } @Override - public void getContinuation(ActionListener listener) { - inner.getContinuation(listener.map(continuation -> new LoggingChunkedRestResponseBody(continuation, loggerStream))); + public void getNextPart(ActionListener listener) { + inner.getNextPart(listener.map(continuation -> new LoggingChunkedRestResponseBodyPart(continuation, loggerStream))); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 0c08520a5dd0b..b08f6ed81017a 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -857,7 +857,7 @@ public void sendResponse(RestResponse response) { final var headers = response.getHeaders(); response = RestResponse.chunked( response.status(), - new EncodedLengthTrackingChunkedRestResponseBody(response.chunkedContent(), responseLengthRecorder), + new EncodedLengthTrackingChunkedRestResponseBodyPart(response.chunkedContent(), responseLengthRecorder), Releasables.wrap(responseLengthRecorder, response) ); for (final var header : headers.entrySet()) { @@ -916,13 +916,13 @@ void addChunkLength(long chunkLength) { } } - private static class EncodedLengthTrackingChunkedRestResponseBody implements ChunkedRestResponseBody { + private static class EncodedLengthTrackingChunkedRestResponseBodyPart implements ChunkedRestResponseBodyPart { - private final ChunkedRestResponseBody delegate; + private final ChunkedRestResponseBodyPart delegate; private final ResponseLengthRecorder responseLengthRecorder; - private EncodedLengthTrackingChunkedRestResponseBody( - ChunkedRestResponseBody delegate, + private EncodedLengthTrackingChunkedRestResponseBodyPart( + ChunkedRestResponseBodyPart delegate, ResponseLengthRecorder responseLengthRecorder ) { this.delegate = delegate; @@ -930,19 +930,19 @@ private EncodedLengthTrackingChunkedRestResponseBody( } @Override - public boolean isDone() { - return delegate.isDone(); + public boolean isPartComplete() { + return delegate.isPartComplete(); } @Override - public boolean isEndOfResponse() { - return delegate.isEndOfResponse(); + public boolean isLastPart() { + return delegate.isLastPart(); } @Override - public void getContinuation(ActionListener listener) { - delegate.getContinuation( - listener.map(continuation -> new EncodedLengthTrackingChunkedRestResponseBody(continuation, responseLengthRecorder)) + public void getNextPart(ActionListener listener) { + delegate.getNextPart( + listener.map(continuation -> new EncodedLengthTrackingChunkedRestResponseBodyPart(continuation, responseLengthRecorder)) ); } @@ -950,7 +950,7 @@ public void getContinuation(ActionListener listener) { public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { final ReleasableBytesReference bytesReference = delegate.encodeChunk(sizeHint, recycler); responseLengthRecorder.addChunkLength(bytesReference.length()); - if (isDone() && isEndOfResponse()) { + if (isPartComplete() && isLastPart()) { responseLengthRecorder.close(); } return bytesReference; diff --git a/server/src/main/java/org/elasticsearch/rest/RestResponse.java b/server/src/main/java/org/elasticsearch/rest/RestResponse.java index 9862ab31bd53f..8cc0e35a64802 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestResponse.java +++ b/server/src/main/java/org/elasticsearch/rest/RestResponse.java @@ -48,7 +48,7 @@ public final class RestResponse implements Releasable { private final BytesReference content; @Nullable - private final ChunkedRestResponseBody chunkedResponseBody; + private final ChunkedRestResponseBodyPart chunkedResponseBody; private final String responseMediaType; private Map> customHeaders; @@ -84,9 +84,9 @@ private RestResponse(RestStatus status, String responseMediaType, BytesReference this(status, responseMediaType, content, null, releasable); } - public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content, @Nullable Releasable releasable) { - if (content.isDone()) { - assert content.isEndOfResponse() : "response with continuations must have at least one (possibly-empty) chunk in each part"; + public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBodyPart content, @Nullable Releasable releasable) { + if (content.isPartComplete()) { + assert content.isLastPart() : "response with continuations must have at least one (possibly-empty) chunk in each part"; return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY, releasable); } else { return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content, releasable); @@ -100,7 +100,7 @@ private RestResponse( RestStatus status, String responseMediaType, @Nullable BytesReference content, - @Nullable ChunkedRestResponseBody chunkedResponseBody, + @Nullable ChunkedRestResponseBodyPart chunkedResponseBody, @Nullable Releasable releasable ) { this.status = status; @@ -162,7 +162,7 @@ public BytesReference content() { } @Nullable - public ChunkedRestResponseBody chunkedContent() { + public ChunkedRestResponseBodyPart chunkedContent() { return chunkedResponseBody; } diff --git a/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java b/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java index 3798f2b6b6fb1..ef2aa8418eef3 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java +++ b/server/src/main/java/org/elasticsearch/rest/action/RestChunkedToXContentListener.java @@ -10,7 +10,7 @@ import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.Releasable; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; @@ -40,7 +40,7 @@ protected void processResponse(Response response) throws IOException { channel.sendResponse( RestResponse.chunked( getRestStatus(response), - ChunkedRestResponseBody.fromXContent(response, params, channel), + ChunkedRestResponseBodyPart.fromXContent(response, params, channel), releasableFromResponse(response) ) ); diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java index bcf0d99325594..9cf2d6a2ed395 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesHotThreadsAction.java @@ -27,7 +27,7 @@ import java.util.List; import java.util.Locale; -import static org.elasticsearch.rest.ChunkedRestResponseBody.fromTextChunks; +import static org.elasticsearch.rest.ChunkedRestResponseBodyPart.fromTextChunks; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestResponse.TEXT_CONTENT_TYPE; import static org.elasticsearch.rest.RestUtils.getTimeout; diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java index 5999d1b81da47..2f94e3ab90cbf 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestTable.java @@ -17,7 +17,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; @@ -63,7 +63,7 @@ public static RestResponse buildXContentBuilder(Table table, RestChannel channel return RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromXContent( + ChunkedRestResponseBodyPart.fromXContent( ignored -> Iterators.concat( Iterators.single((builder, params) -> builder.startArray()), Iterators.map(rowOrder.iterator(), row -> (builder, params) -> { @@ -94,7 +94,7 @@ public static RestResponse buildTextPlainResponse(Table table, RestChannel chann return RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromTextChunks( + ChunkedRestResponseBodyPart.fromTextChunks( RestResponse.TEXT_CONTENT_TYPE, Iterators.concat( // optional header diff --git a/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java b/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java index 8be023bb4a182..0a38d59d29729 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java @@ -19,7 +19,7 @@ import org.elasticsearch.http.HttpStats; import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.rest.BaseRestHandler; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; @@ -122,7 +122,7 @@ public RestResponse buildResponse(NodesStatsResponse response) throws Exception return RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromXContent( + ChunkedRestResponseBodyPart.fromXContent( outerParams -> Iterators.concat( ChunkedToXContentHelper.startObject(), Iterators.single((builder, params) -> builder.field("cluster_name", response.getClusterName().value())), diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index f12d8ea5c631a..437276a01f341 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasable; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; @@ -530,20 +530,20 @@ public void testHandleHeadRequest() { { // chunked response final var isClosed = new AtomicBoolean(); - channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() { + channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() { @Override - public boolean isDone() { + public boolean isPartComplete() { return false; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { throw new AssertionError("should not check for end-of-response for HEAD request"); } @Override - public void getContinuation(ActionListener listener) { + public void getNextPart(ActionListener listener) { throw new AssertionError("should not get any continuations for HEAD request"); } @@ -688,25 +688,25 @@ public void testResponseBodyTracing() { HttpRequest httpRequest = new TestHttpRequest(HttpRequest.HttpVersion.HTTP_1_1, RestRequest.Method.GET, "/") { @Override - public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { + public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { try (var bso = new BytesStreamOutput()) { - writeContent(bso, content); + writeContent(bso, firstBodyPart); return new TestHttpResponse(status, bso.bytes()); } catch (IOException e) { return fail(e); } } - private static void writeContent(OutputStream bso, ChunkedRestResponseBody content) throws IOException { - while (content.isDone() == false) { + private static void writeContent(OutputStream bso, ChunkedRestResponseBodyPart content) throws IOException { + while (content.isPartComplete() == false) { try (var bytes = content.encodeChunk(1 << 14, BytesRefRecycler.NON_RECYCLING_INSTANCE)) { bytes.writeTo(bso); } } - if (content.isEndOfResponse()) { + if (content.isLastPart()) { return; } - writeContent(bso, PlainActionFuture.get(content::getContinuation)); + writeContent(bso, PlainActionFuture.get(content::getNextPart)); } }; @@ -735,14 +735,14 @@ private static void writeContent(OutputStream bso, ChunkedRestResponseBody conte ) ); - final var parts = new ArrayList(); - class TestBody implements ChunkedRestResponseBody { + final var parts = new ArrayList(); + class TestBodyPart implements ChunkedRestResponseBodyPart { boolean isDone; final BytesReference thisChunk; final BytesReference remainingChunks; final int remainingContinuations; - TestBody(BytesReference content, int remainingContinuations) { + TestBodyPart(BytesReference content, int remainingContinuations) { if (remainingContinuations == 0) { thisChunk = content; remainingChunks = BytesArray.EMPTY; @@ -755,18 +755,18 @@ class TestBody implements ChunkedRestResponseBody { } @Override - public boolean isDone() { + public boolean isPartComplete() { return isDone; } @Override - public boolean isEndOfResponse() { + public boolean isLastPart() { return remainingContinuations == 0; } @Override - public void getContinuation(ActionListener listener) { - final var continuation = new TestBody(remainingChunks, remainingContinuations - 1); + public void getNextPart(ActionListener listener) { + final var continuation = new TestBodyPart(remainingChunks, remainingContinuations - 1); parts.add(continuation); listener.onResponse(continuation); } @@ -785,7 +785,7 @@ public String getResponseContentTypeString() { } final var isClosed = new AtomicBoolean(); - final var firstPart = new TestBody(responseBody, between(0, 3)); + final var firstPart = new TestBodyPart(responseBody, between(0, 3)); parts.add(firstPart); assertEquals( responseBody, @@ -797,8 +797,8 @@ public String getResponseContentTypeString() { () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, firstPart, () -> { assertTrue(isClosed.compareAndSet(false, true)); for (int i = 0; i < parts.size(); i++) { - assertTrue("isDone " + i, parts.get(i).isDone()); - assertEquals("isEndOfResponse " + i, i == parts.size() - 1, parts.get(i).isEndOfResponse()); + assertTrue("isDone " + i, parts.get(i).isPartComplete()); + assertEquals("isEndOfResponse " + i, i == parts.size() - 1, parts.get(i).isLastPart()); } })) ) diff --git a/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java b/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java index 4e30dde5e5e7e..e7b0232afa245 100644 --- a/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java +++ b/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java @@ -10,7 +10,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; @@ -78,7 +78,7 @@ public HttpResponse createResponse(RestStatus status, BytesReference content) { } @Override - public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { + public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { throw new UnsupportedOperationException("chunked responses not supported"); } diff --git a/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java b/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyPartTests.java similarity index 81% rename from server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java rename to server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyPartTests.java index cce2a8db25c8e..9c703d83e7d0a 100644 --- a/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyTests.java +++ b/server/src/test/java/org/elasticsearch/rest/ChunkedRestResponseBodyPartTests.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; -public class ChunkedRestResponseBodyTests extends ESTestCase { +public class ChunkedRestResponseBodyPartTests extends ESTestCase { public void testEncodesChunkedXContentCorrectly() throws IOException { final ChunkedToXContent chunkedToXContent = (ToXContent.Params outerParams) -> Iterators.forArray( @@ -50,7 +50,7 @@ public void testEncodesChunkedXContentCorrectly() throws IOException { } final var bytesDirect = BytesReference.bytes(builderDirect); - var chunkedResponse = ChunkedRestResponseBody.fromXContent( + var firstBodyPart = ChunkedRestResponseBodyPart.fromXContent( chunkedToXContent, ToXContent.EMPTY_PARAMS, new FakeRestChannel( @@ -61,20 +61,25 @@ public void testEncodesChunkedXContentCorrectly() throws IOException { ); final List refsGenerated = new ArrayList<>(); - while (chunkedResponse.isDone() == false) { - refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); + while (firstBodyPart.isPartComplete() == false) { + refsGenerated.add(firstBodyPart.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); } + assertTrue(firstBodyPart.isLastPart()); assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]))); } public void testFromTextChunks() throws IOException { final var chunks = randomList(1000, () -> randomUnicodeOfLengthBetween(1, 100)); - var body = ChunkedRestResponseBody.fromTextChunks("text/plain", Iterators.map(chunks.iterator(), s -> w -> w.write(s))); + var firstBodyPart = ChunkedRestResponseBodyPart.fromTextChunks( + "text/plain", + Iterators.map(chunks.iterator(), s -> w -> w.write(s)) + ); final List refsGenerated = new ArrayList<>(); - while (body.isDone() == false) { - refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); + while (firstBodyPart.isPartComplete() == false) { + refsGenerated.add(firstBodyPart.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE)); } + assertTrue(firstBodyPart.isLastPart()); final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])); try (var outputStream = new ByteArrayOutputStream(); var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) { diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 37300f1c19b1c..10ea83e59c0ad 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -733,7 +733,7 @@ public HttpResponse createResponse(RestStatus status, BytesReference content) { } @Override - public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { + public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { throw new AssertionError("should not be called"); } diff --git a/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java b/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java index 41a54ac580a55..eaef60e15822d 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestResponseTests.java @@ -97,7 +97,7 @@ public void testWithHeaders() throws Exception { public void testEmptyChunkedBody() { RestResponse response = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()), + ChunkedRestResponseBodyPart.fromTextChunks(RestResponse.TEXT_CONTENT_TYPE, Collections.emptyIterator()), null ); assertFalse(response.isChunked()); diff --git a/server/src/test/java/org/elasticsearch/rest/action/cat/RestTableTests.java b/server/src/test/java/org/elasticsearch/rest/action/cat/RestTableTests.java index dff6b52e470df..cb98eaddb77cd 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/cat/RestTableTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/cat/RestTableTests.java @@ -432,14 +432,15 @@ public int pageSize() { }; final var bodyChunks = new ArrayList(); - final var chunkedRestResponseBody = response.chunkedContent(); + final var firstBodyPart = response.chunkedContent(); - while (chunkedRestResponseBody.isDone() == false) { - try (var chunk = chunkedRestResponseBody.encodeChunk(pageSize, recycler)) { + while (firstBodyPart.isPartComplete() == false) { + try (var chunk = firstBodyPart.encodeChunk(pageSize, recycler)) { assertThat(chunk.length(), greaterThan(0)); bodyChunks.add(chunk.utf8ToString()); } } + assertTrue(firstBodyPart.isLastPart()); assertEquals(0, openPages.get()); return bodyChunks; } diff --git a/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java b/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java index 1b1331fe25bbf..fe2df39b21591 100644 --- a/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java @@ -28,8 +28,8 @@ public static BytesReference getBodyContent(RestResponse restResponse) { return restResponse.content(); } - final var chunkedRestResponseBody = restResponse.chunkedContent(); - assert chunkedRestResponseBody.isDone() == false; + final var firstResponseBodyPart = restResponse.chunkedContent(); + assert firstResponseBodyPart.isPartComplete() == false; final int pageSize; try (var page = NON_RECYCLING_INSTANCE.obtain()) { @@ -37,12 +37,12 @@ public static BytesReference getBodyContent(RestResponse restResponse) { } try (var out = new BytesStreamOutput()) { - while (chunkedRestResponseBody.isDone() == false) { - try (var chunk = chunkedRestResponseBody.encodeChunk(pageSize, NON_RECYCLING_INSTANCE)) { + while (firstResponseBodyPart.isPartComplete() == false) { + try (var chunk = firstResponseBodyPart.encodeChunk(pageSize, NON_RECYCLING_INSTANCE)) { chunk.writeTo(out); } } - assert chunkedRestResponseBody.isEndOfResponse() : "RestResponseUtils#getBodyContent does not support continuations (yet)"; + assert firstResponseBodyPart.isLastPart() : "RestResponseUtils#getBodyContent does not support continuations (yet)"; out.flush(); return out.bytes(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 726d2ec0d963d..3a9c4b371c9da 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -16,7 +16,7 @@ import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.xcontent.NamedXContentRegistry; @@ -129,7 +129,7 @@ public boolean containsHeader(String name) { } @Override - public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { + public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { return createResponse(status, BytesArray.EMPTY); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java index 34f2906d003ae..0ed77b624f5b0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java @@ -12,7 +12,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; -import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; @@ -132,13 +132,13 @@ private RestResponse buildResponse(EsqlQueryResponse esqlResponse) throws IOExce if (mediaType instanceof TextFormat format) { restResponse = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromTextChunks(format.contentType(restRequest), format.format(restRequest, esqlResponse)), + ChunkedRestResponseBodyPart.fromTextChunks(format.contentType(restRequest), format.format(restRequest, esqlResponse)), releasable ); } else { restResponse = RestResponse.chunked( RestStatus.OK, - ChunkedRestResponseBody.fromXContent(esqlResponse, channel.request(), channel), + ChunkedRestResponseBodyPart.fromXContent(esqlResponse, channel.request(), channel), releasable ); } From 0fab6074b63e2a80f3383fafb69a08db883cf38c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 27 May 2024 09:59:22 +0100 Subject: [PATCH 2/3] =?UTF-8?q?=F0=9F=A6=85=F0=9F=91=81%?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/elasticsearch/http/DefaultRestChannelTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 437276a01f341..d49347a0dd3fc 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -797,8 +797,8 @@ public String getResponseContentTypeString() { () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, firstPart, () -> { assertTrue(isClosed.compareAndSet(false, true)); for (int i = 0; i < parts.size(); i++) { - assertTrue("isDone " + i, parts.get(i).isPartComplete()); - assertEquals("isEndOfResponse " + i, i == parts.size() - 1, parts.get(i).isLastPart()); + assertTrue("isPartComplete " + i, parts.get(i).isPartComplete()); + assertEquals("isLastPart " + i, i == parts.size() - 1, parts.get(i).isLastPart()); } })) ) From 1a8144d8dc0f3bffae5576d88272d5111abd60e8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 27 May 2024 11:28:04 +0100 Subject: [PATCH 3/3] CI poke