diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java new file mode 100644 index 0000000000000..25195a1176fb8 --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -0,0 +1,713 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; +import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.ChunkedLoggingStreamTestUtils; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.http.HttpBodyTracer; +import org.elasticsearch.http.HttpRouteStats; +import org.elasticsearch.http.HttpRouteStatsTracker; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.netty4.Netty4Utils; +import org.elasticsearch.xcontent.ToXContentObject; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestResponse.TEXT_CONTENT_TYPE; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; + +public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase { + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.concatLists( + List.of(YieldsContinuationsPlugin.class, InfiniteContinuationsPlugin.class, CountDown3Plugin.class), + super.nodePlugins() + ); + } + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + private static final String expectedBody = """ + batch-0-chunk-0 + batch-0-chunk-1 + batch-0-chunk-2 + batch-1-chunk-0 + batch-1-chunk-1 + batch-1-chunk-2 + batch-2-chunk-0 + batch-2-chunk-1 + batch-2-chunk-2 + """; + + public void testBasic() throws IOException { + try (var ignored = withResourceTracker()) { + final var response = getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(response.getEntity().getContentType().toString(), containsString(TEXT_CONTENT_TYPE)); + assertTrue(response.getEntity().isChunked()); + final String body; + try (var reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)) { + body = Streams.copyToString(reader); + } + assertEquals(expectedBody, body); + } + } + + @TestLogging( + reason = "testing TRACE logging", + value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" + ) + public void testTraceLogging() { + + // slightly awkward test, we can't use ChunkedLoggingStreamTestUtils.getDecodedLoggedBody directly because it asserts that we _only_ + // log one thing and we can't easily separate the request body from the response body logging, so instead we capture the body log + // message and then log it again with a different logger. + final var resources = new ArrayList(); + try (var ignored = Releasables.wrap(resources)) { + resources.add(withResourceTracker()); + final var executor = EsExecutors.newFixed( + "test", + 1, + -1, + EsExecutors.daemonThreadFactory(Settings.EMPTY, "test"), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + resources.add(() -> assertTrue(ThreadPool.terminate(executor, 10, TimeUnit.SECONDS))); + var loggingFinishedLatch = new CountDownLatch(1); + MockLog.assertThatLogger( + () -> assertEquals( + expectedBody, + ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( + logger, + Level.INFO, + "response body", + ReferenceDocs.HTTP_TRACER, + () -> { + final var request = new Request("GET", YieldsContinuationsPlugin.ROUTE); + request.addParameter("error_trace", "true"); + getRestClient().performRequest(request); + safeAwait(loggingFinishedLatch); + } + ).utf8ToString() + ), + HttpBodyTracer.class, + new MockLog.LoggingExpectation() { + final Pattern messagePattern = Pattern.compile("^\\[[1-9][0-9]*] (response body.*)"); + + @Override + public void match(LogEvent event) { + final var formattedMessage = event.getMessage().getFormattedMessage(); + final var matcher = messagePattern.matcher(formattedMessage); + if (matcher.matches()) { + executor.execute(() -> { + logger.info("{}", matcher.group(1)); + if (formattedMessage.contains(ReferenceDocs.HTTP_TRACER.toString())) { + loggingFinishedLatch.countDown(); + } + }); + } + } + + @Override + public void assertMatched() {} + } + ); + } + } + + public void testResponseBodySizeStats() throws IOException { + try (var ignored = withResourceTracker()) { + final var totalResponseSizeBefore = getTotalResponseSize(); + getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); + final var totalResponseSizeAfter = getTotalResponseSize(); + assertEquals(expectedBody.length(), totalResponseSizeAfter - totalResponseSizeBefore); + } + } + + private static final HttpRouteStats EMPTY_ROUTE_STATS = new HttpRouteStatsTracker().getStats(); + + private long getTotalResponseSize() { + return client().admin() + .cluster() + .prepareNodesStats() + .clear() + .setHttp(true) + .get() + .getNodes() + .stream() + .mapToLong( + ns -> ns.getHttp().httpRouteStats().getOrDefault(YieldsContinuationsPlugin.ROUTE, EMPTY_ROUTE_STATS).totalResponseSize() + ) + .sum(); + } + + public void testPipelining() throws Exception { + try (var ignored = withResourceTracker(); var nettyClient = new Netty4HttpClient()) { + final var responses = nettyClient.get( + randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), + CountDown3Plugin.ROUTE, + YieldsContinuationsPlugin.ROUTE, + CountDown3Plugin.ROUTE, + YieldsContinuationsPlugin.ROUTE, + CountDown3Plugin.ROUTE + ); + + assertEquals("{}", Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(1).content()).utf8ToString()); + assertEquals("{}", Netty4Utils.toBytesReference(responses.get(2).content()).utf8ToString()); + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(3).content()).utf8ToString()); + assertEquals("{}", Netty4Utils.toBytesReference(responses.get(4).content()).utf8ToString()); + } finally { + internalCluster().fullRestart(); // reset countdown listener + } + } + + public void testContinuationFailure() throws Exception { + try (var ignored = withResourceTracker(); var nettyClient = new Netty4HttpClient()) { + final var failIndex = between(0, 2); + final var responses = nettyClient.get( + randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), + YieldsContinuationsPlugin.ROUTE, + YieldsContinuationsPlugin.ROUTE + "?" + YieldsContinuationsPlugin.FAIL_INDEX_PARAM + "=" + failIndex + ); + + if (failIndex == 0) { + assertThat( + responses, + anyOf( + // might get a 500 response if the failure is early enough + hasSize(2), + // might get no response before channel closed + hasSize(1), + // might even close the channel before flushing the previous response + hasSize(0) + ) + ); + + if (responses.size() == 2) { + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(500, responses.get(1).status().code()); + } + } else { + assertThat(responses, hasSize(1)); + } + + if (responses.size() > 0) { + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(200, responses.get(0).status().code()); + } + } + } + + public void testClientCancellation() { + try (var ignored = withResourceTracker()) { + final var cancellable = getRestClient().performRequestAsync( + new Request("GET", InfiniteContinuationsPlugin.ROUTE), + new ResponseListener() { + @Override + public void onSuccess(Response response) { + fail("should not succeed"); + } + + @Override + public void onFailure(Exception exception) { + assertThat(exception, instanceOf(CancellationException.class)); + } + } + ); + if (randomBoolean()) { + safeSleep(scaledRandomIntBetween(10, 500)); // make it more likely the request started executing + } + cancellable.cancel(); + } // closing the request tracker ensures that everything is released, including all response chunks and the overall response + } + + private static Releasable withResourceTracker() { + assertNull(refs); + final var latch = new CountDownLatch(1); + refs = AbstractRefCounted.of(latch::countDown); + return () -> { + refs.decRef(); + try { + safeAwait(latch); + } finally { + refs = null; + } + }; + } + + private static volatile RefCounted refs = null; + + /** + * Adds a REST route which yields a sequence of continuations which are computed asynchronously, effectively pausing after each one.. + */ + public static class YieldsContinuationsPlugin extends Plugin implements ActionPlugin { + static final String ROUTE = "/_test/yields_continuations"; + static final String FAIL_INDEX_PARAM = "fail_index"; + + private static final ActionType TYPE = new ActionType<>("test:yields_continuations"); + + @Override + public Collection> getActions() { + return List.of(new ActionHandler<>(TYPE, TransportYieldsContinuationsAction.class)); + } + + public static class Request extends ActionRequest { + final int failIndex; + + public Request(int failIndex) { + this.failIndex = failIndex; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class Response extends ActionResponse { + private final int failIndex; + private final Executor executor; + + public Response(int failIndex, Executor executor) { + this.failIndex = failIndex; + this.executor = executor; + } + + @Override + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); + } + + public ChunkedRestResponseBody getChunkedBody() { + return getChunkBatch(0); + } + + private ChunkedRestResponseBody getChunkBatch(int batchIndex) { + if (batchIndex == failIndex && randomBoolean()) { + throw new ElasticsearchException("simulated failure creating next batch"); + } + return new ChunkedRestResponseBody() { + + private final Iterator lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n"); + + @Override + public boolean isDone() { + return lines.hasNext() == false; + } + + @Override + public boolean isEndOfResponse() { + return batchIndex == 2; + } + + @Override + public void getContinuation(ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, () -> getChunkBatch(batchIndex + 1))); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { + assertTrue(lines.hasNext()); + refs.mustIncRef(); + final var output = new RecyclerBytesStreamOutput(recycler); + boolean success = false; + try { + try (var writer = new OutputStreamWriter(Streams.flushOnCloseStream(output), StandardCharsets.UTF_8)) { + writer.write(lines.next()); + } + final var result = new ReleasableBytesReference(output.bytes(), Releasables.wrap(output, refs::decRef)); + if (batchIndex == failIndex) { + throw new ElasticsearchException("simulated failure encoding chunk"); + } + success = true; + return result; + } finally { + if (success == false) { + refs.decRef(); + output.close(); + } + } + } + + @Override + public String getResponseContentTypeString() { + assertEquals(0, batchIndex); + return TEXT_CONTENT_TYPE; + } + }; + } + } + + public static class TransportYieldsContinuationsAction extends TransportAction { + private final ExecutorService executor; + + @Inject + public TransportYieldsContinuationsAction(ActionFilters actionFilters, TransportService transportService) { + super(TYPE.name(), actionFilters, transportService.getTaskManager()); + executor = transportService.getThreadPool().executor(ThreadPool.Names.GENERIC); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, () -> new Response(request.failIndex, executor))); + } + } + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final var failIndex = request.paramAsInt(FAIL_INDEX_PARAM, Integer.MAX_VALUE); + refs.mustIncRef(); + return new RestChannelConsumer() { + + @Override + public void close() { + refs.decRef(); + } + + @Override + public void accept(RestChannel channel) { + refs.mustIncRef(); + client.execute(TYPE, new Request(failIndex), new RestActionListener<>(channel) { + @Override + protected void processResponse(Response response) { + try { + final var responseBody = response.getChunkedBody(); // might fail, so do this before acquiring ref + refs.mustIncRef(); + channel.sendResponse(RestResponse.chunked(RestStatus.OK, responseBody, refs::decRef)); + } finally { + refs.decRef(); + } + } + }); + } + }; + } + }); + } + } + + /** + * Adds a REST route which yields an infinite sequence of continuations which can only be stopped by the client closing the connection. + */ + public static class InfiniteContinuationsPlugin extends Plugin implements ActionPlugin { + static final String ROUTE = "/_test/infinite_continuations"; + + private static final ActionType TYPE = new ActionType<>("test:infinite_continuations"); + + @Override + public Collection> getActions() { + return List.of(new ActionHandler<>(TYPE, TransportInfiniteContinuationsAction.class)); + } + + public static class Request extends ActionRequest { + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class Response extends ActionResponse { + private final Executor executor; + volatile boolean computingContinuation; + + public Response(Executor executor) { + this.executor = executor; + } + + @Override + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); + } + + public ChunkedRestResponseBody getChunkedBody() { + return new ChunkedRestResponseBody() { + private final Iterator lines = Iterators.single("infinite response\n"); + + @Override + public boolean isDone() { + return lines.hasNext() == false; + } + + @Override + public boolean isEndOfResponse() { + return false; + } + + @Override + public void getContinuation(ActionListener listener) { + computingContinuation = true; + executor.execute(ActionRunnable.supply(listener, () -> { + computingContinuation = false; + return getChunkedBody(); + })); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { + assertTrue(lines.hasNext()); + refs.mustIncRef(); + return new ReleasableBytesReference(new BytesArray(lines.next()), refs::decRef); + } + + @Override + public String getResponseContentTypeString() { + return TEXT_CONTENT_TYPE; + } + }; + } + } + + public static class TransportInfiniteContinuationsAction extends TransportAction { + private final ExecutorService executor; + + @Inject + public TransportInfiniteContinuationsAction(ActionFilters actionFilters, TransportService transportService) { + super(TYPE.name(), actionFilters, transportService.getTaskManager()); + this.executor = transportService.getThreadPool().executor(ThreadPool.Names.GENERIC); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + executor.execute( + ActionRunnable.supply(ActionTestUtils.assertNoFailureListener(listener::onResponse), () -> new Response(executor)) + ); + } + } + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final var localRefs = refs; // single volatile read + if (localRefs != null && localRefs.tryIncRef()) { + return new RestChannelConsumer() { + @Override + public void close() { + localRefs.decRef(); + } + + @Override + public void accept(RestChannel channel) { + localRefs.mustIncRef(); + client.execute(TYPE, new Request(), new RestActionListener<>(channel) { + @Override + protected void processResponse(Response response) { + channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), () -> { + // cancellation notification only happens while processing a continuation, not while computing + // the next one; prompt cancellation requires use of something like RestCancellableNodeClient + assertFalse(response.computingContinuation); + assertSame(localRefs, refs); + localRefs.decRef(); + })); + } + }); + } + }; + } else { + throw new TaskCancelledException("request cancelled"); + } + } + }); + } + } + + /** + * Adds an HTTP route that waits for 3 concurrent executions before returning any of them + */ + public static class CountDown3Plugin extends Plugin implements ActionPlugin { + + static final String ROUTE = "/_test/countdown_3"; + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + private final SubscribableListener subscribableListener = new SubscribableListener<>(); + private final CountDownActionListener countDownActionListener = new CountDownActionListener( + 3, + subscribableListener.map(v -> EMPTY_RESPONSE) + ); + + private void addListener(ActionListener listener) { + subscribableListener.addListener(listener); + countDownActionListener.onResponse(null); + } + + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + refs.mustIncRef(); + return new RestChannelConsumer() { + + @Override + public void close() { + refs.decRef(); + } + + @Override + public void accept(RestChannel channel) { + refs.mustIncRef(); + addListener(ActionListener.releaseAfter(new RestToXContentListener<>(channel), refs::decRef)); + } + }; + } + }); + } + } + + private static final ToXContentObject EMPTY_RESPONSE = (builder, params) -> builder.startObject().endObject(); +} diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java index 2f472dab23afa..b2a54e2027308 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseListener; @@ -250,6 +251,16 @@ public boolean isDone() { return chunkIterator.hasNext() == false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + assert false : "no continuations"; + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { localRefs.mustIncRef(); diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java index 130a1168d455c..ce8da0c08af54 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java @@ -251,6 +251,16 @@ public boolean isDone() { return false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + fail("no continuations here"); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { assert bytesRemaining >= 0 : "already failed"; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java new file mode 100644 index 0000000000000..156f1c27aa67c --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.util.concurrent.PromiseCombiner; + +import org.elasticsearch.rest.ChunkedRestResponseBody; + +final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse { + private final int sequence; + private final ChunkedRestResponseBody body; + private final PromiseCombiner combiner; + + Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) { + this.sequence = sequence; + this.body = body; + this.combiner = combiner; + } + + @Override + public int getSequence() { + return sequence; + } + + public ChunkedRestResponseBody body() { + return body; + } + + public PromiseCombiner combiner() { + return combiner; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java index f5f32bf333779..783c02da0bbcc 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java @@ -19,7 +19,7 @@ /** * A http response that will be transferred via chunked encoding when handled by {@link Netty4HttpPipeliningHandler}. */ -public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4HttpResponse, HttpResponse { +final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4HttpResponse, HttpResponse { private final int sequence; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index b86e168e2e620..8280c438613a2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -28,6 +28,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.core.Booleans; @@ -148,6 +149,8 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann } private void enqueuePipelinedResponse(ChannelHandlerContext ctx, Netty4HttpResponse restResponse, ChannelPromise promise) { + assert restResponse instanceof Netty4ChunkedHttpContinuation == false + : "received out-of-order continuation at [" + restResponse.getSequence() + "], expecting [" + writeSequence + "]"; assert restResponse.getSequence() > writeSequence : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]"; if (outboundHoldingQueue.size() >= maxEventsHeld) { @@ -187,6 +190,8 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse doWriteFullResponse(ctx, fullResponse, promise); } else if (readyResponse instanceof Netty4ChunkedHttpResponse chunkedResponse) { doWriteChunkedResponse(ctx, chunkedResponse, promise); + } else if (readyResponse instanceof Netty4ChunkedHttpContinuation chunkedContinuation) { + doWriteChunkedContinuation(ctx, chunkedContinuation, promise); } else { assert false : readyResponse.getClass().getCanonicalName(); throw new IllegalStateException("illegal message type: " + readyResponse.getClass().getCanonicalName()); @@ -224,16 +229,75 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp } } + private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise) { + final PromiseCombiner combiner = continuation.combiner(); + assert currentChunkedWrite == null; + final var responseBody = continuation.body(); + assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part"; + currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); + // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. + while (ctx.channel().isWritable()) { + if (writeChunk(ctx, currentChunkedWrite)) { + finishChunkedWrite(); + return; + } + } + } + private void finishChunkedWrite() { if (currentChunkedWrite == null) { // failure during chunked response serialization, we're closing the channel return; } - assert currentChunkedWrite.responseBody().isDone(); final var finishingWrite = currentChunkedWrite; currentChunkedWrite = null; - writeSequence++; - finishingWrite.combiner().finish(finishingWrite.onDone()); + final var finishingWriteBody = finishingWrite.responseBody(); + assert finishingWriteBody.isDone(); + final var endOfResponse = finishingWriteBody.isEndOfResponse(); + if (endOfResponse) { + writeSequence++; + finishingWrite.combiner().finish(finishingWrite.onDone()); + } else { + final var channel = finishingWrite.onDone().channel(); + ActionListener.run(ActionListener.assertOnce(new ActionListener<>() { + @Override + public void onResponse(ChunkedRestResponseBody continuation) { + channel.writeAndFlush( + new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), + finishingWrite.onDone() // pass the terminal listener/promise along the line + ); + checkShutdown(); + } + + @Override + public void onFailure(Exception e) { + logger.error( + Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), + e + ); + channel.close().addListener(ignored -> { + finishingWrite.combiner().add(channel.newFailedFuture(e)); + finishingWrite.combiner().finish(finishingWrite.onDone()); + }); + checkShutdown(); + } + + private void checkShutdown() { + if (channel.eventLoop().isShuttingDown()) { + // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the + // preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so + // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated. + // Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor + // which means this completion is not subject to the same issue, it still works even if the event loop has already + // terminated. + channel.eventLoop() + .terminationFuture() + .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException())); + } + } + + }), finishingWriteBody::getContinuation); + } } private void splitAndWrite(ChannelHandlerContext ctx, Netty4FullHttpResponse msg, ChannelPromise promise) { @@ -321,7 +385,8 @@ private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) } final ByteBuf content = Netty4Utils.toByteBuf(bytes); final boolean done = body.isDone(); - final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); + final boolean lastChunk = done && body.isEndOfResponse(); + final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); f.addListener(ignored -> bytes.close()); combiner.add(f); return done; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java index 3396b13cdab0f..80cf3469c00ca 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java @@ -11,7 +11,7 @@ /** * Super-interface for responses handled by the Netty4 HTTP transport. */ -public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse { +sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse, Netty4ChunkedHttpContinuation { /** * @return The sequence number for the request which corresponds with this response, for making sure that we send responses to pipelined * requests in the correct order. diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index 9e0f30caec755..bb4a0939c98f0 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -28,6 +28,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -511,6 +512,16 @@ public boolean isDone() { return remaining == 0; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + fail("no continuations here"); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { assertThat(remaining, greaterThan(0)); diff --git a/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java b/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java index 1773a4803f62a..1dd2868f7bfa6 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java +++ b/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java @@ -17,7 +17,7 @@ import java.io.OutputStream; -class HttpBodyTracer { +public class HttpBodyTracer { private static final Logger logger = LogManager.getLogger(HttpBodyTracer.class); public static boolean isEnabled() { diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 5c41be0fc9f9f..2f7fc458ca020 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -8,6 +8,8 @@ package org.elasticsearch.rest; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; @@ -20,6 +22,8 @@ import org.elasticsearch.core.Streams; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -31,18 +35,51 @@ import java.util.Iterator; /** - * The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and - * instead serialize only as much of the response as can be flushed to the network right away. + *

A body (or a part thereof) of an HTTP response that uses the {@code chunked} transfer-encoding. This allows Elasticsearch to avoid + * materializing the full response into on-heap buffers up front, instead serializing only as much of the response as can be flushed to the + * network right away.

+ * + *

Each {@link ChunkedRestResponseBody} represents a sequence of chunks that are ready for immediate transmission: if {@link #isDone} + * returns {@code false} then {@link #encodeChunk} can be called at any time and must synchronously return the next chunk to be sent. + * Many HTTP responses will be a single such sequence. However, if an implementation's {@link #isEndOfResponse} returns {@code false} at the + * end of the sequence then the transmission is paused and {@link #getContinuation} is called to compute the next sequence of chunks + * asynchronously.

*/ public interface ChunkedRestResponseBody { Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class); /** - * @return true once this response has been written fully. + * @return {@code true} if this body contains no more chunks and the REST layer should check for a possible continuation by calling + * {@link #isEndOfResponse}, or {@code false} if the REST layer should request another chunk from this body using {@link #encodeChunk}. */ boolean isDone(); + /** + * @return {@code true} if this is the last chunked body in the response, or {@code false} if the REST layer should request further + * chunked bodies by calling {@link #getContinuation}. + */ + boolean isEndOfResponse(); + + /** + *

Asynchronously retrieves the next part of the body. Called if {@link #isEndOfResponse} returns {@code false}.

+ * + *

Note that this is called on a transport thread, so implementations must take care to dispatch any nontrivial work elsewhere.

+ + *

Note that the {@link Task} corresponding to any invocation of {@link Client#execute} completes as soon as the client action + * returns its response, so it no longer exists when this method is called and cannot be used to receive cancellation notifications. + * Instead, if the HTTP channel is closed while sending a response then the REST layer will invoke {@link RestResponse#close}. If the + * HTTP channel is closed while the REST layer is waiting for a continuation then the {@link RestResponse} will not be closed until the + * continuation listener is completed. Implementations will typically explicitly create a {@link CancellableTask} to represent the + * computation and transmission of the entire {@link RestResponse}, and will cancel this task if the {@link RestResponse} is closed + * prematurely.

+ * + * @param listener Listener to complete with the next part of the body. By the point this is called we have already started to send + * the body of the response, so there's no good ways to handle an exception here. Completing the listener exceptionally + * will log an error, abort sending the response, and close the HTTP connection. + */ + void getContinuation(ActionListener listener); + /** * Serializes approximately as many bytes of the response as request by {@code sizeHint} to a {@link ReleasableBytesReference} that * is created from buffers backed by the given {@code recycler}. @@ -102,6 +139,17 @@ public boolean isDone() { return serialization.hasNext() == false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + assert false : "no continuations"; + listener.onFailure(new IllegalStateException("no continuations available")); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { try { @@ -180,6 +228,17 @@ public boolean isDone() { return chunkIterator.hasNext() == false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + assert false : "no continuations"; + listener.onFailure(new IllegalStateException("no continuations available")); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { try { diff --git a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java index 0508828c70da1..865f433e25aa4 100644 --- a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.recycler.Recycler; @@ -30,6 +31,16 @@ public boolean isDone() { return inner.isDone(); } + @Override + public boolean isEndOfResponse() { + return inner.isEndOfResponse(); + } + + @Override + public void getContinuation(ActionListener listener) { + inner.getContinuation(listener.map(continuation -> new LoggingChunkedRestResponseBody(continuation, loggerStream))); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { var chunk = inner.encodeChunk(sizeHint, recycler); diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 16813f1141e12..0c08520a5dd0b 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -934,11 +934,23 @@ public boolean isDone() { return delegate.isDone(); } + @Override + public boolean isEndOfResponse() { + return delegate.isEndOfResponse(); + } + + @Override + public void getContinuation(ActionListener listener) { + delegate.getContinuation( + listener.map(continuation -> new EncodedLengthTrackingChunkedRestResponseBody(continuation, responseLengthRecorder)) + ); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { final ReleasableBytesReference bytesReference = delegate.encodeChunk(sizeHint, recycler); responseLengthRecorder.addChunkLength(bytesReference.length()); - if (isDone()) { + if (isDone() && isEndOfResponse()) { responseLengthRecorder.close(); } return bytesReference; diff --git a/server/src/main/java/org/elasticsearch/rest/RestResponse.java b/server/src/main/java/org/elasticsearch/rest/RestResponse.java index a4a44a5a65561..9862ab31bd53f 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestResponse.java +++ b/server/src/main/java/org/elasticsearch/rest/RestResponse.java @@ -86,6 +86,7 @@ private RestResponse(RestStatus status, String responseMediaType, BytesReference public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content, @Nullable Releasable releasable) { if (content.isDone()) { + assert content.isEndOfResponse() : "response with continuations must have at least one (possibly-empty) chunk in each part"; return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY, releasable); } else { return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content, releasable); diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 82eb88a90873f..f12d8ea5c631a 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.bytes.BytesArray; @@ -51,9 +52,11 @@ import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -534,6 +537,16 @@ public boolean isDone() { return false; } + @Override + public boolean isEndOfResponse() { + throw new AssertionError("should not check for end-of-response for HEAD request"); + } + + @Override + public void getContinuation(ActionListener listener) { + throw new AssertionError("should not get any continuations for HEAD request"); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { throw new AssertionError("should not try to serialize response body for HEAD request"); @@ -677,16 +690,24 @@ public void testResponseBodyTracing() { @Override public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { try (var bso = new BytesStreamOutput()) { - while (content.isDone() == false) { - try (var bytes = content.encodeChunk(1 << 14, BytesRefRecycler.NON_RECYCLING_INSTANCE)) { - bytes.writeTo(bso); - } - } + writeContent(bso, content); return new TestHttpResponse(status, bso.bytes()); } catch (IOException e) { return fail(e); } } + + private static void writeContent(OutputStream bso, ChunkedRestResponseBody content) throws IOException { + while (content.isDone() == false) { + try (var bytes = content.encodeChunk(1 << 14, BytesRefRecycler.NON_RECYCLING_INSTANCE)) { + bytes.writeTo(bso); + } + } + if (content.isEndOfResponse()) { + return; + } + writeContent(bso, PlainActionFuture.get(content::getContinuation)); + } }; final RestRequest request = RestRequest.request(parserConfig(), httpRequest, httpChannel); @@ -714,7 +735,58 @@ public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody co ) ); + final var parts = new ArrayList(); + class TestBody implements ChunkedRestResponseBody { + boolean isDone; + final BytesReference thisChunk; + final BytesReference remainingChunks; + final int remainingContinuations; + + TestBody(BytesReference content, int remainingContinuations) { + if (remainingContinuations == 0) { + thisChunk = content; + remainingChunks = BytesArray.EMPTY; + } else { + var splitAt = between(0, content.length()); + thisChunk = content.slice(0, splitAt); + remainingChunks = content.slice(splitAt, content.length() - splitAt); + } + this.remainingContinuations = remainingContinuations; + } + + @Override + public boolean isDone() { + return isDone; + } + + @Override + public boolean isEndOfResponse() { + return remainingContinuations == 0; + } + + @Override + public void getContinuation(ActionListener listener) { + final var continuation = new TestBody(remainingChunks, remainingContinuations - 1); + parts.add(continuation); + listener.onResponse(continuation); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { + assertFalse(isDone); + isDone = true; + return ReleasableBytesReference.wrap(thisChunk); + } + + @Override + public String getResponseContentTypeString() { + return RestResponse.TEXT_CONTENT_TYPE; + } + } + final var isClosed = new AtomicBoolean(); + final var firstPart = new TestBody(responseBody, between(0, 3)); + parts.add(firstPart); assertEquals( responseBody, ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( @@ -722,27 +794,13 @@ public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody co Level.TRACE, "[" + request.getRequestId() + "] response body", ReferenceDocs.HTTP_TRACER, - () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() { - - boolean isDone; - - @Override - public boolean isDone() { - return isDone; - } - - @Override - public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { - assertFalse(isDone); - isDone = true; - return ReleasableBytesReference.wrap(responseBody); - } - - @Override - public String getResponseContentTypeString() { - return RestResponse.TEXT_CONTENT_TYPE; + () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, firstPart, () -> { + assertTrue(isClosed.compareAndSet(false, true)); + for (int i = 0; i < parts.size(); i++) { + assertTrue("isDone " + i, parts.get(i).isDone()); + assertEquals("isEndOfResponse " + i, i == parts.size() - 1, parts.get(i).isEndOfResponse()); } - }, () -> assertTrue(isClosed.compareAndSet(false, true)))) + })) ) ); diff --git a/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java b/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java index dfbd7266cc4a2..1b1331fe25bbf 100644 --- a/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java @@ -42,6 +42,7 @@ public static BytesReference getBodyContent(RestResponse restResponse) { chunk.writeTo(out); } } + assert chunkedRestResponseBody.isEndOfResponse() : "RestResponseUtils#getBodyContent does not support continuations (yet)"; out.flush(); return out.bytes();