From c92a7ec5dcc12273b28c379d3aed68fbe5bea460 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 29 Jan 2024 10:02:18 +0000 Subject: [PATCH 01/25] Pausable chunked HTTP responses Today we must collect together in memory everything needed to send a HTTP response before starting to send it to the client, even if we're using the `chunked` transfer encoding to bound the memory needed for the final serialization step. To properly bound the memory usage on the coordinating node we must instead be able to start sending the response to the client before we have collected everything needed to finish it. If we do this then we must be able to handle the case where we run out of data to send by pausing the transmission and resuming it once there's more data to send. This commit extends the `ChunkedRestResponseBody` interface to allow it to express that it has run out of chunks for immediate transmission, but that the body will continue at a later point. --- .../netty4/Netty4ChunkedContinuationsIT.java | 523 ++++++++++++++++++ .../http/netty4/Netty4ChunkedEncodingIT.java | 11 + .../netty4/Netty4ChunkedHttpContinuation.java | 38 ++ .../netty4/Netty4HttpPipeliningHandler.java | 52 +- .../http/netty4/Netty4HttpResponse.java | 2 +- .../http/netty4/Netty4HttpClient.java | 14 +- .../Netty4HttpPipeliningHandlerTests.java | 11 + .../rest/ChunkedRestResponseBody.java | 36 ++ .../rest/LoggingChunkedRestResponseBody.java | 11 + .../elasticsearch/rest/RestController.java | 14 +- .../http/DefaultRestChannelTests.java | 104 +++- 11 files changed, 784 insertions(+), 32 deletions(-) create mode 100644 modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java new file mode 100644 index 0000000000000..15f600225a477 --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -0,0 +1,523 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.ChunkedLoggingStreamTestUtils; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.RefCounted; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.http.HttpRouteStats; +import org.elasticsearch.http.HttpRouteStatsTracker; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.node.PluginComponentBinding; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.ChunkedRestResponseBody; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.netty4.Netty4Utils; +import org.elasticsearch.xcontent.ToXContentObject; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestResponse.TEXT_CONTENT_TYPE; +import static org.hamcrest.Matchers.containsString; + +public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase { + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.concatLists( + List.of(YieldsContinuationsPlugin.class, CountDown3Plugin.class, ExposesRequestRefs.class), + super.nodePlugins() + ); + } + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + private static final String expectedBody = """ + batch-0-chunk-0 + batch-0-chunk-1 + batch-0-chunk-2 + batch-1-chunk-0 + batch-1-chunk-1 + batch-1-chunk-2 + batch-2-chunk-0 + batch-2-chunk-1 + batch-2-chunk-2 + """; + + public void testBasic() throws IOException { + try (var ignored = withRequestTracker()) { + final var response = getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertThat(response.getEntity().getContentType().toString(), containsString(TEXT_CONTENT_TYPE)); + assertTrue(response.getEntity().isChunked()); + final String body; + try (var reader = new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)) { + body = Streams.copyToString(reader); + } + assertEquals(expectedBody, body); + } + } + + @TestLogging( + reason = "testing TRACE logging", + value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" + ) + public void testTraceLogging() throws Exception { + + // slightly awkward test, we can't use ChunkedLoggingStreamTestUtils.getDecodedLoggedBody directly because it asserts that we _only_ + // log one thing and we can't easily separate the request body from the response body logging, so instead we capture the body log + // message and then log it again in isolation. + + var loggedResponseMessageFuture = new PlainActionFuture(); + var requestIdFuture = new PlainActionFuture(); + var mockLogAppender = new MockLogAppender(); + mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { + final Pattern messagePattern = Pattern.compile("^\\[([1-9][0-9]*)] response body.*"); + + @Override + public void match(LogEvent event) { + final var formattedMessage = event.getMessage().getFormattedMessage(); + final var matcher = messagePattern.matcher(formattedMessage); + if (matcher.matches()) { + assertFalse(loggedResponseMessageFuture.isDone()); + loggedResponseMessageFuture.onResponse(formattedMessage); + requestIdFuture.onResponse(Integer.valueOf(matcher.group(1))); + } + } + + @Override + public void assertMatched() {} + }); + mockLogAppender.start(); + final var bodyTracerLogger = LogManager.getLogger("org.elasticsearch.http.HttpBodyTracer"); + Loggers.addAppender(bodyTracerLogger, mockLogAppender); + + try (var ignored = withRequestTracker()) { + getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); + final var loggedResponseMessage = loggedResponseMessageFuture.get(10, TimeUnit.SECONDS); + final var loggedBody = ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( + logger, + Level.INFO, + "[" + requestIdFuture.get(10, TimeUnit.SECONDS) + "] response body", + ReferenceDocs.HTTP_TRACER, + () -> logger.info(loggedResponseMessage) + ); + mockLogAppender.assertAllExpectationsMatched(); + assertEquals(expectedBody, loggedBody.utf8ToString()); + } finally { + Loggers.removeAppender(bodyTracerLogger, mockLogAppender); + mockLogAppender.stop(); + } + } + + public void testResponseBodySizeStats() throws IOException { + try (var ignored = withRequestTracker()) { + final var totalResponseSizeBefore = getTotalResponseSize(); + getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); + final var totalResponseSizeAfter = getTotalResponseSize(); + assertEquals(expectedBody.length(), totalResponseSizeAfter - totalResponseSizeBefore); + } + } + + private static final HttpRouteStats EMPTY_ROUTE_STATS = new HttpRouteStatsTracker().getStats(); + + private long getTotalResponseSize() { + return client().admin() + .cluster() + .prepareNodesStats() + .clear() + .setHttp(true) + .get() + .getNodes() + .stream() + .mapToLong( + ns -> ns.getHttp().httpRouteStats().getOrDefault(YieldsContinuationsPlugin.ROUTE, EMPTY_ROUTE_STATS).totalResponseSize() + ) + .sum(); + } + + public void testPipelining() throws Exception { + try (var ignored = withRequestTracker(); var nettyClient = new Netty4HttpClient()) { + final var responses = nettyClient.get( + randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), + CountDown3Plugin.ROUTE, + YieldsContinuationsPlugin.ROUTE, + CountDown3Plugin.ROUTE, + YieldsContinuationsPlugin.ROUTE, + CountDown3Plugin.ROUTE + ); + + assertEquals("{}", Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(1).content()).utf8ToString()); + assertEquals("{}", Netty4Utils.toBytesReference(responses.get(2).content()).utf8ToString()); + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(3).content()).utf8ToString()); + assertEquals("{}", Netty4Utils.toBytesReference(responses.get(4).content()).utf8ToString()); + } + } + + private static Releasable withRequestTracker() { + final var latch = new CountDownLatch(1); + final var refCounted = AbstractRefCounted.of(latch::countDown); + setPluginRequestRefs(refCounted); + return () -> { + setPluginRequestRefs(RefCounted.ALWAYS_REFERENCED); + refCounted.decRef(); + safeAwait(latch); + }; + } + + private static void setPluginRequestRefs(RefCounted refCounted) { + Iterators.flatMap( + internalCluster().getInstances(PluginsService.class).iterator(), + pluginsService -> pluginsService.filterPlugins(HasRequestRefs.class).iterator() + ).forEachRemaining(p -> p.setRequestRefs(refCounted)); + } + + interface HasRequestRefs { + void setRequestRefs(RefCounted requestRefs); + } + + interface TestRefCounted extends RefCounted {} + + public static class ExposesRequestRefs extends Plugin implements HasRequestRefs, TestRefCounted { + RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; + + @Override + public void setRequestRefs(RefCounted requestRefs) { + this.requestRefs = requestRefs; + } + + @Override + public void incRef() { + requestRefs.incRef(); + } + + @Override + public boolean tryIncRef() { + return requestRefs.tryIncRef(); + } + + @Override + public boolean decRef() { + return requestRefs.decRef(); + } + + @Override + public boolean hasReferences() { + return requestRefs.hasReferences(); + } + + @Override + public Collection createComponents(PluginServices services) { + return List.of(new PluginComponentBinding(TestRefCounted.class, this)); + } + } + + public static class YieldsContinuationsPlugin extends Plugin implements ActionPlugin, HasRequestRefs { + static final String ROUTE = "/_test/yields_continuations"; + + private static final ActionType TYPE = new ActionType<>("test:yields_continuations"); + + RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; + + @Override + public Collection> getActions() { + return List.of(new ActionHandler<>(TYPE, TransportYieldsContinuationsAction.class)); + } + + @Override + public void setRequestRefs(RefCounted requestRefs) { + this.requestRefs = requestRefs; + } + + public static class Request extends ActionRequest { + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class Response extends ActionResponse { + private final Executor executor; + private final TestRefCounted requestRefs; + + public Response(Executor executor, TestRefCounted requestRefs) { + this.executor = executor; + this.requestRefs = requestRefs; + } + + @Override + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); + } + + public ChunkedRestResponseBody getChunkedBody() { + return getChunkBatch(0); + } + + private ChunkedRestResponseBody getChunkBatch(int batchIndex) { + return new ChunkedRestResponseBody() { + + private final Iterator lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n"); + + @Override + public boolean isDone() { + return lines.hasNext() == false; + } + + @Override + public boolean isEndOfResponse() { + return batchIndex == 2; + } + + @Override + public void getContinuation(ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, () -> getChunkBatch(batchIndex + 1))); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { + assertTrue(lines.hasNext()); + requestRefs.mustIncRef(); + final var output = new RecyclerBytesStreamOutput(recycler); + boolean success = false; + try { + try (var writer = new OutputStreamWriter(Streams.flushOnCloseStream(output), StandardCharsets.UTF_8)) { + writer.write(lines.next()); + } + final var result = new ReleasableBytesReference(output.bytes(), Releasables.wrap(output, requestRefs::decRef)); + success = true; + return result; + } finally { + if (success == false) { + requestRefs.decRef(); + output.close(); + } + } + } + + @Override + public String getResponseContentTypeString() { + assertEquals(0, batchIndex); + return TEXT_CONTENT_TYPE; + } + }; + } + } + + public static class TransportYieldsContinuationsAction extends TransportAction { + private final ExecutorService executor; + private final TestRefCounted requestRefs; + + @Inject + public TransportYieldsContinuationsAction( + ActionFilters actionFilters, + TransportService transportService, + TestRefCounted requestRefs + ) { + super(TYPE.name(), actionFilters, transportService.getTaskManager()); + executor = transportService.getThreadPool().executor(ThreadPool.Names.GENERIC); + this.requestRefs = requestRefs; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, () -> new Response(executor, requestRefs))); + } + } + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new BaseRestHandler() { + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final var localRequestRefs = requestRefs; + localRequestRefs.mustIncRef(); + return new RestChannelConsumer() { + + @Override + public void close() { + localRequestRefs.decRef(); + } + + @Override + public void accept(RestChannel channel) { + localRequestRefs.mustIncRef(); + client.execute(TYPE, new Request(), new RestActionListener<>(channel) { + @Override + protected void processResponse(Response response) { + channel.sendResponse( + RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), localRequestRefs::decRef) + ); + } + }); + } + }; + } + }); + } + } + + /** + * Adds an HTTP route that waits for 3 concurrent executions before returning any of them + */ + public static class CountDown3Plugin extends Plugin implements ActionPlugin, HasRequestRefs { + + static final String ROUTE = "/_test/countdown_3"; + + RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; + + @Override + public void setRequestRefs(RefCounted requestRefs) { + this.requestRefs = requestRefs; + } + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new BaseRestHandler() { + private final SubscribableListener subscribableListener = new SubscribableListener<>(); + private final CountDownActionListener countDownActionListener = new CountDownActionListener( + 3, + subscribableListener.map(v -> EMPTY_RESPONSE) + ); + + private void addListener(ActionListener listener) { + subscribableListener.addListener(listener); + countDownActionListener.onResponse(null); + } + + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + requestRefs.mustIncRef(); + return new RestChannelConsumer() { + + @Override + public void close() { + requestRefs.decRef(); + } + + @Override + public void accept(RestChannel channel) { + requestRefs.mustIncRef(); + addListener(ActionListener.releaseAfter(new RestToXContentListener<>(channel), requestRefs::decRef)); + } + }; + } + }); + } + } + + private static final ToXContentObject EMPTY_RESPONSE = (builder, params) -> builder.startObject().endObject(); +} diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java index 8cd68abdcce42..00108f52dbb22 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedEncodingIT.java @@ -10,6 +10,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Request; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -158,6 +159,16 @@ public boolean isDone() { return chunkIterator.hasNext() == false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + assert false : "no continuations"; + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { final var page = recycler.obtain(); // just to ensure nothing is leaked diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java new file mode 100644 index 0000000000000..68ae3da98052e --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.util.concurrent.PromiseCombiner; + +import org.elasticsearch.rest.ChunkedRestResponseBody; + +public final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse { + private final int sequence; + private final ChunkedRestResponseBody body; + private final PromiseCombiner combiner; + + public Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) { + this.sequence = sequence; + this.body = body; + this.combiner = combiner; + } + + @Override + public int getSequence() { + return sequence; + } + + public ChunkedRestResponseBody body() { + return body; + } + + public PromiseCombiner combiner() { + return combiner; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index b2947b32ebdde..6eef0009f6117 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -28,6 +28,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; @@ -198,6 +200,8 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse doWriteFullResponse(ctx, fullResponse, promise); } else if (readyResponse instanceof Netty4ChunkedHttpResponse chunkedResponse) { doWriteChunkedResponse(ctx, chunkedResponse, promise); + } else if (readyResponse instanceof Netty4ChunkedHttpContinuation chunkedContinuation) { + doWriteChunkedContinuation(ctx, chunkedContinuation, promise); } else { assert false : readyResponse.getClass().getCanonicalName(); throw new IllegalStateException("illegal message type: " + readyResponse.getClass().getCanonicalName()); @@ -236,13 +240,54 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp } } + private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise) + throws IOException { + final PromiseCombiner combiner = continuation.combiner(); + assert currentChunkedWrite == null; + final var responseBody = continuation.body(); + currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); + // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. + while (ctx.channel().isWritable()) { + if (writeChunk(ctx, combiner, responseBody)) { + finishChunkedWrite(); + return; + } + } + } + private void finishChunkedWrite() { assert currentChunkedWrite != null; assert currentChunkedWrite.responseBody().isDone(); final var finishingWrite = currentChunkedWrite; + final var endOfResponse = finishingWrite.responseBody().isEndOfResponse(); currentChunkedWrite = null; - writeSequence++; - finishingWrite.combiner.finish(finishingWrite.onDone()); + if (endOfResponse) { + writeSequence++; + finishingWrite.combiner.finish(finishingWrite.onDone()); + } else { + final var channel = finishingWrite.onDone().channel(); + ActionListener.run(ActionListener.assertOnce(new ActionListener<>() { + @Override + public void onResponse(ChunkedRestResponseBody continuation) { + channel.writeAndFlush( + new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), + finishingWrite.onDone() // pass the terminal listener/promise along the line + ); + } + + @Override + public void onFailure(Exception e) { + // TODO tests of this case + logger.error( + Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), + e + ); + finishingWrite.combiner().add(channel.newFailedFuture(e)); + finishingWrite.combiner().finish(finishingWrite.onDone()); + channel.close(); + } + }), finishingWrite.responseBody()::getContinuation); + } } private void splitAndWrite(ChannelHandlerContext ctx, Netty4FullHttpResponse msg, ChannelPromise promise) { @@ -326,7 +371,8 @@ private boolean writeChunk(ChannelHandlerContext ctx, PromiseCombiner combiner, ); final ByteBuf content = Netty4Utils.toByteBuf(bytes); final boolean done = body.isDone(); - final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); + final boolean lastChunk = done && body.isEndOfResponse(); + final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content)); f.addListener(ignored -> bytes.close()); combiner.add(f); return done; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java index 3396b13cdab0f..366955f9b194c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java @@ -11,7 +11,7 @@ /** * Super-interface for responses handled by the Netty4 HTTP transport. */ -public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse { +public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse, Netty4ChunkedHttpContinuation { /** * @return The sequence number for the request which corresponds with this response, for making sure that we send responses to pipelined * requests in the correct order. diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java index 2524be154414e..887c0b1b2f3e5 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.netty4.NettyAllocator; import java.io.Closeable; @@ -137,9 +138,20 @@ private synchronized List sendRequests(final SocketAddress rem channelFuture = clientBootstrap.connect(remoteAddress); channelFuture.sync(); + boolean needsFinalFlush = false; for (HttpRequest request : requests) { - channelFuture.channel().writeAndFlush(request); + if (ESTestCase.randomBoolean()) { + channelFuture.channel().writeAndFlush(request); + needsFinalFlush = false; + } else { + channelFuture.channel().write(request); + needsFinalFlush = true; + } + } + if (needsFinalFlush) { + channelFuture.channel().flush(); } + if (latch.await(30L, TimeUnit.SECONDS) == false) { fail("Failed to get all expected responses."); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index 9e0f30caec755..bb4a0939c98f0 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -28,6 +28,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -511,6 +512,16 @@ public boolean isDone() { return remaining == 0; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + fail("no continuations here"); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { assertThat(remaining, greaterThan(0)); diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 5c41be0fc9f9f..d7a408b8cd275 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -8,6 +8,7 @@ package org.elasticsearch.rest; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; @@ -43,6 +44,21 @@ public interface ChunkedRestResponseBody { */ boolean isDone(); + /** + * @return true if this is the last chunked body in the response. + */ + boolean isEndOfResponse(); + + /** + * Asynchronously retrieves the next part of the body. Note that this is called on a transport thread, so implementations must take care + * to dispatch any nontrivial work elsewhere. + * + * @param listener Listener to complete with the next part of the body. By the point this is called we have already started to send + * the body of the response, so there's no good ways to handle an exception here. Completing the listener exceptionally + * will log an error, abort sending the response, and close the HTTP connection. + */ + void getContinuation(ActionListener listener); + /** * Serializes approximately as many bytes of the response as request by {@code sizeHint} to a {@link ReleasableBytesReference} that * is created from buffers backed by the given {@code recycler}. @@ -102,6 +118,16 @@ public boolean isDone() { return serialization.hasNext() == false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + assert false : "no continuations"; + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { try { @@ -180,6 +206,16 @@ public boolean isDone() { return chunkIterator.hasNext() == false; } + @Override + public boolean isEndOfResponse() { + return true; + } + + @Override + public void getContinuation(ActionListener listener) { + assert false : "no continuations"; + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { try { diff --git a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java index 0508828c70da1..865f433e25aa4 100644 --- a/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/LoggingChunkedRestResponseBody.java @@ -9,6 +9,7 @@ package org.elasticsearch.rest; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.recycler.Recycler; @@ -30,6 +31,16 @@ public boolean isDone() { return inner.isDone(); } + @Override + public boolean isEndOfResponse() { + return inner.isEndOfResponse(); + } + + @Override + public void getContinuation(ActionListener listener) { + inner.getContinuation(listener.map(continuation -> new LoggingChunkedRestResponseBody(continuation, loggerStream))); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { var chunk = inner.encodeChunk(sizeHint, recycler); diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 2ebee9c59482e..ee2c1750d90b6 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -907,11 +907,23 @@ public boolean isDone() { return delegate.isDone(); } + @Override + public boolean isEndOfResponse() { + return delegate.isEndOfResponse(); + } + + @Override + public void getContinuation(ActionListener listener) { + delegate.getContinuation( + listener.map(continuation -> new EncodedLengthTrackingChunkedRestResponseBody(continuation, responseLengthRecorder)) + ); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { final ReleasableBytesReference bytesReference = delegate.encodeChunk(sizeHint, recycler); responseLengthRecorder.addChunkLength(bytesReference.length()); - if (isDone()) { + if (isDone() && isEndOfResponse()) { responseLengthRecorder.close(); } return bytesReference; diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index c49da619d7630..a93486b2d7e8e 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.bytes.BytesArray; @@ -51,6 +52,7 @@ import org.mockito.ArgumentCaptor; import java.io.IOException; +import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; @@ -534,6 +536,16 @@ public boolean isDone() { return false; } + @Override + public boolean isEndOfResponse() { + throw new AssertionError("should not check for end-of-response for HEAD request"); + } + + @Override + public void getContinuation(ActionListener listener) { + throw new AssertionError("should not get any continuations for HEAD request"); + } + @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { throw new AssertionError("should not try to serialize response body for HEAD request"); @@ -680,16 +692,24 @@ public void testResponseBodyTracing() { @Override public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) { try (var bso = new BytesStreamOutput()) { - while (content.isDone() == false) { - try (var bytes = content.encodeChunk(1 << 14, BytesRefRecycler.NON_RECYCLING_INSTANCE)) { - bytes.writeTo(bso); - } - } + writeContent(bso, content); return new TestHttpResponse(status, bso.bytes()); } catch (IOException e) { return fail(e); } } + + private static void writeContent(OutputStream bso, ChunkedRestResponseBody content) throws IOException { + while (content.isDone() == false) { + try (var bytes = content.encodeChunk(1 << 14, BytesRefRecycler.NON_RECYCLING_INSTANCE)) { + bytes.writeTo(bso); + } + } + if (content.isEndOfResponse()) { + return; + } + writeContent(bso, PlainActionFuture.get(content::getContinuation)); + } }; final RestRequest request = RestRequest.request(parserConfig(), httpRequest, httpChannel); @@ -717,6 +737,52 @@ public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody co ) ); + class TestBody implements ChunkedRestResponseBody { + boolean isDone; + final BytesReference thisChunk; + final BytesReference remainingChunks; + final int remainingContinuations; + + TestBody(BytesReference content, int remainingContinuations) { + if (remainingContinuations == 0) { + thisChunk = content; + remainingChunks = BytesArray.EMPTY; + } else { + var splitAt = between(0, content.length()); + thisChunk = content.slice(0, splitAt); + remainingChunks = content.slice(splitAt, content.length() - splitAt); + } + this.remainingContinuations = remainingContinuations; + } + + @Override + public boolean isDone() { + return isDone; + } + + @Override + public boolean isEndOfResponse() { + return remainingContinuations == 0; + } + + @Override + public void getContinuation(ActionListener listener) { + listener.onResponse(new TestBody(remainingChunks, remainingContinuations - 1)); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { + assertFalse(isDone); + isDone = true; + return ReleasableBytesReference.wrap(thisChunk); + } + + @Override + public String getResponseContentTypeString() { + return RestResponse.TEXT_CONTENT_TYPE; + } + } + final var isClosed = new AtomicBoolean(); assertEquals( responseBody, @@ -725,27 +791,13 @@ public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody co Level.TRACE, "[" + request.getRequestId() + "] response body", ReferenceDocs.HTTP_TRACER, - () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() { - - boolean isDone; - - @Override - public boolean isDone() { - return isDone; - } - - @Override - public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { - assertFalse(isDone); - isDone = true; - return ReleasableBytesReference.wrap(responseBody); - } - - @Override - public String getResponseContentTypeString() { - return RestResponse.TEXT_CONTENT_TYPE; - } - }, () -> assertTrue(isClosed.compareAndSet(false, true)))) + () -> channel.sendResponse( + RestResponse.chunked( + RestStatus.OK, + new TestBody(responseBody, between(0, 3)), + () -> assertTrue(isClosed.compareAndSet(false, true)) + ) + ) ) ); From ff24f3ce45690122c2d75ada2ea8bf7ec680f52c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 29 Jan 2024 12:13:25 +0000 Subject: [PATCH 02/25] Add test for the failure path --- .../netty4/Netty4ChunkedContinuationsIT.java | 40 +++++++++++++++++-- .../netty4/Netty4HttpPipeliningHandler.java | 7 ++-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 15f600225a477..a7772b057b1b2 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.core.LogEvent; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; @@ -226,6 +227,26 @@ public void testPipelining() throws Exception { assertEquals("{}", Netty4Utils.toBytesReference(responses.get(2).content()).utf8ToString()); assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(3).content()).utf8ToString()); assertEquals("{}", Netty4Utils.toBytesReference(responses.get(4).content()).utf8ToString()); + } finally { + internalCluster().fullRestart(); // reset countdown listener + } + } + + public void testContinuationFailure() throws Exception { + // TODO when https://github.com/netty/netty/issues/13816 addressed, verify that we see the failure properly and no later responses + try (var ignored = withRequestTracker(); var nettyClient = new Netty4HttpClient()) { + final var responses = nettyClient.get( + randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), + YieldsContinuationsPlugin.ROUTE, + YieldsContinuationsPlugin.ROUTE + "?" + YieldsContinuationsPlugin.FAIL_INDEX_PARAM + "=1" + ); + + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(""" + batch-0-chunk-0 + batch-0-chunk-1 + batch-0-chunk-2 + """, Netty4Utils.toBytesReference(responses.get(1).content()).utf8ToString()); } } @@ -289,6 +310,7 @@ public Collection createComponents(PluginServices services) { public static class YieldsContinuationsPlugin extends Plugin implements ActionPlugin, HasRequestRefs { static final String ROUTE = "/_test/yields_continuations"; + static final String FAIL_INDEX_PARAM = "fail_index"; private static final ActionType TYPE = new ActionType<>("test:yields_continuations"); @@ -305,6 +327,12 @@ public void setRequestRefs(RefCounted requestRefs) { } public static class Request extends ActionRequest { + final int failIndex; + + public Request(int failIndex) { + this.failIndex = failIndex; + } + @Override public ActionRequestValidationException validate() { return null; @@ -312,10 +340,12 @@ public ActionRequestValidationException validate() { } public static class Response extends ActionResponse { + private final int failIndex; private final Executor executor; private final TestRefCounted requestRefs; - public Response(Executor executor, TestRefCounted requestRefs) { + public Response(int failIndex, Executor executor, TestRefCounted requestRefs) { + this.failIndex = failIndex; this.executor = executor; this.requestRefs = requestRefs; } @@ -330,6 +360,9 @@ public ChunkedRestResponseBody getChunkedBody() { } private ChunkedRestResponseBody getChunkBatch(int batchIndex) { + if (batchIndex == failIndex) { + throw new ElasticsearchException("simulated failure"); + } return new ChunkedRestResponseBody() { private final Iterator lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n"); @@ -396,7 +429,7 @@ public TransportYieldsContinuationsAction( @Override protected void doExecute(Task task, Request request, ActionListener listener) { - executor.execute(ActionRunnable.supply(listener, () -> new Response(executor, requestRefs))); + executor.execute(ActionRunnable.supply(listener, () -> new Response(request.failIndex, executor, requestRefs))); } } @@ -424,6 +457,7 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final var failIndex = request.paramAsInt(FAIL_INDEX_PARAM, Integer.MAX_VALUE); final var localRequestRefs = requestRefs; localRequestRefs.mustIncRef(); return new RestChannelConsumer() { @@ -436,7 +470,7 @@ public void close() { @Override public void accept(RestChannel channel) { localRequestRefs.mustIncRef(); - client.execute(TYPE, new Request(), new RestActionListener<>(channel) { + client.execute(TYPE, new Request(failIndex), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { channel.sendResponse( diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 6eef0009f6117..6a892806e5dbc 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -282,9 +282,10 @@ public void onFailure(Exception e) { Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), e ); - finishingWrite.combiner().add(channel.newFailedFuture(e)); - finishingWrite.combiner().finish(finishingWrite.onDone()); - channel.close(); + channel.close().addListener(ignored -> { + finishingWrite.combiner().add(channel.newFailedFuture(e)); + finishingWrite.combiner().finish(finishingWrite.onDone()); + }); } }), finishingWrite.responseBody()::getContinuation); } From e957bc713a43b6b502cb3945677fc8593a51e817 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 30 Jan 2024 09:24:47 +0000 Subject: [PATCH 03/25] TODO is done --- .../elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 6a892806e5dbc..535324fd7e058 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -277,7 +277,6 @@ public void onResponse(ChunkedRestResponseBody continuation) { @Override public void onFailure(Exception e) { - // TODO tests of this case logger.error( Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), e From 18e65e21fc37765c67694cbee6d0a2f894589a4b Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 31 Jan 2024 15:51:57 +0000 Subject: [PATCH 04/25] Use #104971 --- .../netty4/Netty4ChunkedContinuationsIT.java | 42 +++++++------------ 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index a7772b057b1b2..3e75f9c297d8c 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -9,7 +9,6 @@ package org.elasticsearch.http.netty4; import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ESNetty4IntegTestCase; @@ -22,7 +21,6 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.CountDownActionListener; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.Request; @@ -38,7 +36,6 @@ import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.ChunkedLoggingStreamTestUtils; -import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -84,7 +81,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -137,51 +133,41 @@ public void testBasic() throws IOException { reason = "testing TRACE logging", value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" ) - public void testTraceLogging() throws Exception { + public void testTraceLogging() { // slightly awkward test, we can't use ChunkedLoggingStreamTestUtils.getDecodedLoggedBody directly because it asserts that we _only_ // log one thing and we can't easily separate the request body from the response body logging, so instead we capture the body log - // message and then log it again in isolation. + // message and then log it again with a different logger. - var loggedResponseMessageFuture = new PlainActionFuture(); - var requestIdFuture = new PlainActionFuture(); var mockLogAppender = new MockLogAppender(); mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { - final Pattern messagePattern = Pattern.compile("^\\[([1-9][0-9]*)] response body.*"); + final Pattern messagePattern = Pattern.compile("^\\[[1-9][0-9]*] (response body.*)"); @Override public void match(LogEvent event) { final var formattedMessage = event.getMessage().getFormattedMessage(); final var matcher = messagePattern.matcher(formattedMessage); if (matcher.matches()) { - assertFalse(loggedResponseMessageFuture.isDone()); - loggedResponseMessageFuture.onResponse(formattedMessage); - requestIdFuture.onResponse(Integer.valueOf(matcher.group(1))); + logger.info("{}", matcher.group(1)); } } @Override public void assertMatched() {} }); - mockLogAppender.start(); - final var bodyTracerLogger = LogManager.getLogger("org.elasticsearch.http.HttpBodyTracer"); - Loggers.addAppender(bodyTracerLogger, mockLogAppender); - try (var ignored = withRequestTracker()) { - getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); - final var loggedResponseMessage = loggedResponseMessageFuture.get(10, TimeUnit.SECONDS); - final var loggedBody = ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( - logger, - Level.INFO, - "[" + requestIdFuture.get(10, TimeUnit.SECONDS) + "] response body", - ReferenceDocs.HTTP_TRACER, - () -> logger.info(loggedResponseMessage) + try (var ignored = withRequestTracker(); var ignored2 = mockLogAppender.capturing("org.elasticsearch.http.HttpBodyTracer")) { + assertEquals( + expectedBody, + ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( + logger, + Level.INFO, + "response body", + ReferenceDocs.HTTP_TRACER, + () -> getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)) + ).utf8ToString() ); mockLogAppender.assertAllExpectationsMatched(); - assertEquals(expectedBody, loggedBody.utf8ToString()); - } finally { - Loggers.removeAppender(bodyTracerLogger, mockLogAppender); - mockLogAppender.stop(); } } From 33fa0cdb2448db09ca9791f097a70a0eb8f06f6d Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 5 Feb 2024 09:06:01 +0000 Subject: [PATCH 05/25] Fixup merge --- .../http/netty4/Netty4ChunkedContinuationsIT.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 3e75f9c297d8c..b460b4c14edb9 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.http.HttpRouteStats; import org.elasticsearch.http.HttpRouteStatsTracker; import org.elasticsearch.http.HttpServerTransport; @@ -81,6 +82,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -428,7 +430,8 @@ public Collection getRestHandlers( IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster + Supplier nodesInCluster, + Predicate clusterSupportsFeature ) { return List.of(new BaseRestHandler() { @Override @@ -494,7 +497,8 @@ public Collection getRestHandlers( IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster + Supplier nodesInCluster, + Predicate clusterSupportsFeature ) { return List.of(new BaseRestHandler() { private final SubscribableListener subscribableListener = new SubscribableListener<>(); From 1d927ca4d6395c01872f19d66ded90fd814bc07e Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 5 Feb 2024 09:49:25 +0000 Subject: [PATCH 06/25] Fix test race --- .../http/netty4/Netty4ChunkedContinuationsIT.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index b460b4c14edb9..5794a6bcd142a 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -141,6 +141,7 @@ public void testTraceLogging() { // log one thing and we can't easily separate the request body from the response body logging, so instead we capture the body log // message and then log it again with a different logger. + var loggingFinishedLatch = new CountDownLatch(1); var mockLogAppender = new MockLogAppender(); mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { final Pattern messagePattern = Pattern.compile("^\\[[1-9][0-9]*] (response body.*)"); @@ -151,6 +152,9 @@ public void match(LogEvent event) { final var matcher = messagePattern.matcher(formattedMessage); if (matcher.matches()) { logger.info("{}", matcher.group(1)); + if (formattedMessage.contains(ReferenceDocs.HTTP_TRACER.toString())) { + loggingFinishedLatch.countDown(); + } } } @@ -161,13 +165,10 @@ public void assertMatched() {} try (var ignored = withRequestTracker(); var ignored2 = mockLogAppender.capturing("org.elasticsearch.http.HttpBodyTracer")) { assertEquals( expectedBody, - ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( - logger, - Level.INFO, - "response body", - ReferenceDocs.HTTP_TRACER, - () -> getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)) - ).utf8ToString() + ChunkedLoggingStreamTestUtils.getDecodedLoggedBody(logger, Level.INFO, "response body", ReferenceDocs.HTTP_TRACER, () -> { + getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); + safeAwait(loggingFinishedLatch); + }).utf8ToString() ); mockLogAppender.assertAllExpectationsMatched(); } From 708fa524b95b1686b08fff1ab75c0d92af52808f Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 7 Feb 2024 08:30:23 +0000 Subject: [PATCH 07/25] Tighter visibility --- .../http/netty4/Netty4ChunkedHttpContinuation.java | 4 ++-- .../elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java | 2 +- .../org/elasticsearch/http/netty4/Netty4HttpResponse.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java index 68ae3da98052e..156f1c27aa67c 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpContinuation.java @@ -12,12 +12,12 @@ import org.elasticsearch.rest.ChunkedRestResponseBody; -public final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse { +final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse { private final int sequence; private final ChunkedRestResponseBody body; private final PromiseCombiner combiner; - public Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) { + Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) { this.sequence = sequence; this.body = body; this.combiner = combiner; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java index f5f32bf333779..783c02da0bbcc 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ChunkedHttpResponse.java @@ -19,7 +19,7 @@ /** * A http response that will be transferred via chunked encoding when handled by {@link Netty4HttpPipeliningHandler}. */ -public final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4HttpResponse, HttpResponse { +final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4HttpResponse, HttpResponse { private final int sequence; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java index 366955f9b194c..80cf3469c00ca 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java @@ -11,7 +11,7 @@ /** * Super-interface for responses handled by the Netty4 HTTP transport. */ -public sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse, Netty4ChunkedHttpContinuation { +sealed interface Netty4HttpResponse permits Netty4FullHttpResponse, Netty4ChunkedHttpResponse, Netty4ChunkedHttpContinuation { /** * @return The sequence number for the request which corresponds with this response, for making sure that we send responses to pipelined * requests in the correct order. From 4954c8e2fdd9bbfa9c47fb0631d1bd75f6d9ef09 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 7 Feb 2024 08:31:29 +0000 Subject: [PATCH 08/25] Assert continuations received in order --- .../elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 535324fd7e058..c701bff046482 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -142,6 +142,8 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann try { final Netty4HttpResponse restResponse = (Netty4HttpResponse) msg; if (restResponse.getSequence() != writeSequence) { + assert restResponse instanceof Netty4ChunkedHttpContinuation == false + : "received out-of-order continuation at [" + restResponse.getSequence() + "], expecting [" + writeSequence + "]"; assert restResponse.getSequence() > writeSequence : "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]"; if (outboundHoldingQueue.size() >= maxEventsHeld) { From aa52c18d9fe859e543e2151ee6bf71fbd0c043ec Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 7 Feb 2024 09:47:14 +0000 Subject: [PATCH 09/25] Comments about cancellation --- .../netty4/Netty4ChunkedContinuationsIT.java | 2 ++ .../rest/ChunkedRestResponseBody.java | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 5794a6bcd142a..86944f91f7e1e 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -239,6 +239,8 @@ public void testContinuationFailure() throws Exception { } } + // TODO add a test showing that we call RestResponse#close even while waiting for a continuation if the HTTP channel closes + private static Releasable withRequestTracker() { final var latch = new CountDownLatch(1); final var refCounted = AbstractRefCounted.of(latch::countDown); diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index d7a408b8cd275..a797c5fb257bc 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -9,6 +9,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.BytesStream; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; @@ -21,6 +22,8 @@ import org.elasticsearch.core.Streams; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -45,13 +48,22 @@ public interface ChunkedRestResponseBody { boolean isDone(); /** - * @return true if this is the last chunked body in the response. + * @return {@code true} if this is the last chunked body in the response, or {@code false} if the REST layer should request further + * chunked bodies by calling {@link #getContinuation}. */ boolean isEndOfResponse(); /** - * Asynchronously retrieves the next part of the body. Note that this is called on a transport thread, so implementations must take care - * to dispatch any nontrivial work elsewhere. + *

Asynchronously retrieves the next part of the body. Called if {@link #isEndOfResponse} returns {@code false}.

+ * + *

Note that this is called on a transport thread, so implementations must take care to dispatch any nontrivial work elsewhere.

+ + *

Note that the {@link Task} corresponding to any invocation of {@link Client#execute} completes as soon as the client action + * returns its response, so it no longer exists when this method is called and cannot be used to receive cancellation notifications. + * Instead, if the HTTP channel is closed while sending a response (including while waiting for a continuation) then the REST layer + * will invoke {@link RestResponse#close}. Implementations will typically explicitly create a {@link CancellableTask} to represent the + * computation and transmission of the entire {@link RestResponse}, and will cancel this task if the {@link RestResponse} is closed + * prematurely.

* * @param listener Listener to complete with the next part of the body. By the point this is called we have already started to send * the body of the response, so there's no good ways to handle an exception here. Completing the listener exceptionally From 2022d7a5f424b79a1e2674920cac2d4634014b4e Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 7 Feb 2024 18:48:31 +0000 Subject: [PATCH 10/25] Add test for client cancellation --- .../netty4/Netty4ChunkedContinuationsIT.java | 247 ++++++++++++++---- .../netty4/Netty4HttpPipeliningHandler.java | 5 + 2 files changed, 199 insertions(+), 53 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 86944f91f7e1e..912b52e76897f 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -20,14 +20,18 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.ReferenceDocs; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.inject.Inject; @@ -50,7 +54,6 @@ import org.elasticsearch.http.HttpRouteStats; import org.elasticsearch.http.HttpRouteStatsTracker; import org.elasticsearch.http.HttpServerTransport; -import org.elasticsearch.node.PluginComponentBinding; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; @@ -79,6 +82,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; @@ -89,13 +93,14 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestResponse.TEXT_CONTENT_TYPE; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase { @Override protected Collection> nodePlugins() { return CollectionUtils.concatLists( - List.of(YieldsContinuationsPlugin.class, CountDown3Plugin.class, ExposesRequestRefs.class), + List.of(YieldsContinuationsPlugin.class, InfiniteContinuationsPlugin.class, CountDown3Plugin.class), super.nodePlugins() ); } @@ -239,7 +244,28 @@ public void testContinuationFailure() throws Exception { } } - // TODO add a test showing that we call RestResponse#close even while waiting for a continuation if the HTTP channel closes + public void testClientCancellation() { + try (var ignored = withRequestTracker()) { + final var cancellable = getRestClient().performRequestAsync( + new Request("GET", InfiniteContinuationsPlugin.ROUTE), + new ResponseListener() { + @Override + public void onSuccess(Response response) { + fail("should not succeed"); + } + + @Override + public void onFailure(Exception exception) { + assertThat(exception, instanceOf(CancellationException.class)); + } + } + ); + if (randomBoolean()) { + safeSleep(10); // make it more likely the request started executing + } + cancellable.cancel(); + } // closing the request tracker ensures that everything is released, including all response chunks and the overall response + } private static Releasable withRequestTracker() { final var latch = new CountDownLatch(1); @@ -263,49 +289,16 @@ interface HasRequestRefs { void setRequestRefs(RefCounted requestRefs); } - interface TestRefCounted extends RefCounted {} - - public static class ExposesRequestRefs extends Plugin implements HasRequestRefs, TestRefCounted { - RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; - - @Override - public void setRequestRefs(RefCounted requestRefs) { - this.requestRefs = requestRefs; - } - - @Override - public void incRef() { - requestRefs.incRef(); - } - - @Override - public boolean tryIncRef() { - return requestRefs.tryIncRef(); - } - - @Override - public boolean decRef() { - return requestRefs.decRef(); - } - - @Override - public boolean hasReferences() { - return requestRefs.hasReferences(); - } - - @Override - public Collection createComponents(PluginServices services) { - return List.of(new PluginComponentBinding(TestRefCounted.class, this)); - } - } - + /** + * Adds a REST route which yields a sequence of continuations which are computed asynchronously, effectively pausing after each one.. + */ public static class YieldsContinuationsPlugin extends Plugin implements ActionPlugin, HasRequestRefs { static final String ROUTE = "/_test/yields_continuations"; static final String FAIL_INDEX_PARAM = "fail_index"; private static final ActionType TYPE = new ActionType<>("test:yields_continuations"); - RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; + private RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; @Override public Collection> getActions() { @@ -318,9 +311,11 @@ public void setRequestRefs(RefCounted requestRefs) { } public static class Request extends ActionRequest { + final RefCounted requestRefs; final int failIndex; - public Request(int failIndex) { + public Request(RefCounted requestRefs, int failIndex) { + this.requestRefs = requestRefs; this.failIndex = failIndex; } @@ -333,9 +328,9 @@ public ActionRequestValidationException validate() { public static class Response extends ActionResponse { private final int failIndex; private final Executor executor; - private final TestRefCounted requestRefs; + private final RefCounted requestRefs; - public Response(int failIndex, Executor executor, TestRefCounted requestRefs) { + public Response(int failIndex, Executor executor, RefCounted requestRefs) { this.failIndex = failIndex; this.executor = executor; this.requestRefs = requestRefs; @@ -405,22 +400,16 @@ public String getResponseContentTypeString() { public static class TransportYieldsContinuationsAction extends TransportAction { private final ExecutorService executor; - private final TestRefCounted requestRefs; @Inject - public TransportYieldsContinuationsAction( - ActionFilters actionFilters, - TransportService transportService, - TestRefCounted requestRefs - ) { + public TransportYieldsContinuationsAction(ActionFilters actionFilters, TransportService transportService) { super(TYPE.name(), actionFilters, transportService.getTaskManager()); executor = transportService.getThreadPool().executor(ThreadPool.Names.GENERIC); - this.requestRefs = requestRefs; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { - executor.execute(ActionRunnable.supply(listener, () -> new Response(request.failIndex, executor, requestRefs))); + executor.execute(ActionRunnable.supply(listener, () -> new Response(request.failIndex, executor, request.requestRefs))); } } @@ -462,7 +451,159 @@ public void close() { @Override public void accept(RestChannel channel) { localRequestRefs.mustIncRef(); - client.execute(TYPE, new Request(failIndex), new RestActionListener<>(channel) { + client.execute(TYPE, new Request(localRequestRefs, failIndex), new RestActionListener<>(channel) { + @Override + protected void processResponse(Response response) { + channel.sendResponse( + RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), localRequestRefs::decRef) + ); + } + }); + } + }; + } + }); + } + } + + /** + * Adds a REST route which yields an infinite sequence of continuations which can only be stopped by the client closing the connection. + */ + public static class InfiniteContinuationsPlugin extends Plugin implements ActionPlugin, HasRequestRefs { + static final String ROUTE = "/_test/infinite_continuations"; + + private static final ActionType TYPE = new ActionType<>("test:infinite_continuations"); + + private RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; + + @Override + public Collection> getActions() { + return List.of(new ActionHandler<>(TYPE, TransportInfiniteContinuationsAction.class)); + } + + @Override + public void setRequestRefs(RefCounted requestRefs) { + this.requestRefs = requestRefs; + } + + public static class Request extends ActionRequest { + final RefCounted requestRefs; + + public Request(RefCounted requestRefs) { + this.requestRefs = requestRefs; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class Response extends ActionResponse { + private final Executor executor; + private final RefCounted requestRefs; + + public Response(Executor executor, RefCounted requestRefs) { + this.executor = executor; + this.requestRefs = requestRefs; + } + + @Override + public void writeTo(StreamOutput out) { + TransportAction.localOnly(); + } + + public ChunkedRestResponseBody getChunkedBody() { + return new ChunkedRestResponseBody() { + private final Iterator lines = Iterators.single("infinite response\n"); + + @Override + public boolean isDone() { + return lines.hasNext() == false; + } + + @Override + public boolean isEndOfResponse() { + return false; + } + + @Override + public void getContinuation(ActionListener listener) { + executor.execute(ActionRunnable.supply(listener, () -> getChunkedBody())); + } + + @Override + public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { + assertTrue(lines.hasNext()); + requestRefs.mustIncRef(); + return new ReleasableBytesReference(new BytesArray(lines.next()), requestRefs::decRef); + } + + @Override + public String getResponseContentTypeString() { + return TEXT_CONTENT_TYPE; + } + }; + } + } + + public static class TransportInfiniteContinuationsAction extends TransportAction { + private final ExecutorService executor; + + @Inject + public TransportInfiniteContinuationsAction(ActionFilters actionFilters, TransportService transportService) { + super(TYPE.name(), actionFilters, transportService.getTaskManager()); + this.executor = transportService.getThreadPool().executor(ThreadPool.Names.GENERIC); + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + executor.execute( + ActionRunnable.supply( + ActionTestUtils.assertNoFailureListener(listener::onResponse), + () -> new Response(executor, request.requestRefs) + ) + ); + } + } + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final var localRequestRefs = requestRefs; + localRequestRefs.mustIncRef(); + return new RestChannelConsumer() { + @Override + public void close() { + localRequestRefs.decRef(); + } + + @Override + public void accept(RestChannel channel) { + localRequestRefs.mustIncRef(); + client.execute(TYPE, new Request(localRequestRefs), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { channel.sendResponse( @@ -484,7 +625,7 @@ public static class CountDown3Plugin extends Plugin implements ActionPlugin, Has static final String ROUTE = "/_test/countdown_3"; - RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; + private RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; @Override public void setRequestRefs(RefCounted requestRefs) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index c701bff046482..ab82cc0ac961f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -385,6 +385,11 @@ private void failQueuedWrites() { while ((queuedWrite = queuedWrites.poll()) != null) { queuedWrite.failAsClosedChannel(); } + if (currentChunkedWrite != null) { + final var finishingWrite = currentChunkedWrite; + currentChunkedWrite = null; + finishingWrite.combiner.finish(finishingWrite.onDone()); + } } @Override From befb0165813665df519e67ed189282bf3495c366 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 8 Feb 2024 12:23:38 +0000 Subject: [PATCH 11/25] Simplify leak detection in tests --- .../netty4/Netty4ChunkedContinuationsIT.java | 164 +++++++----------- 1 file changed, 60 insertions(+), 104 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 912b52e76897f..659a18f498151 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -56,7 +56,6 @@ import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.ChunkedRestResponseBody; import org.elasticsearch.rest.RestChannel; @@ -68,6 +67,7 @@ import org.elasticsearch.rest.action.RestActionListener; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; @@ -123,7 +123,7 @@ protected boolean addMockHttpTransport() { """; public void testBasic() throws IOException { - try (var ignored = withRequestTracker()) { + try (var ignored = withResourceTracker()) { final var response = getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); assertEquals(200, response.getStatusLine().getStatusCode()); assertThat(response.getEntity().getContentType().toString(), containsString(TEXT_CONTENT_TYPE)); @@ -167,7 +167,7 @@ public void match(LogEvent event) { public void assertMatched() {} }); - try (var ignored = withRequestTracker(); var ignored2 = mockLogAppender.capturing("org.elasticsearch.http.HttpBodyTracer")) { + try (var ignored = withResourceTracker(); var ignored2 = mockLogAppender.capturing("org.elasticsearch.http.HttpBodyTracer")) { assertEquals( expectedBody, ChunkedLoggingStreamTestUtils.getDecodedLoggedBody(logger, Level.INFO, "response body", ReferenceDocs.HTTP_TRACER, () -> { @@ -180,7 +180,7 @@ public void assertMatched() {} } public void testResponseBodySizeStats() throws IOException { - try (var ignored = withRequestTracker()) { + try (var ignored = withResourceTracker()) { final var totalResponseSizeBefore = getTotalResponseSize(); getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); final var totalResponseSizeAfter = getTotalResponseSize(); @@ -206,7 +206,7 @@ private long getTotalResponseSize() { } public void testPipelining() throws Exception { - try (var ignored = withRequestTracker(); var nettyClient = new Netty4HttpClient()) { + try (var ignored = withResourceTracker(); var nettyClient = new Netty4HttpClient()) { final var responses = nettyClient.get( randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), CountDown3Plugin.ROUTE, @@ -228,7 +228,7 @@ public void testPipelining() throws Exception { public void testContinuationFailure() throws Exception { // TODO when https://github.com/netty/netty/issues/13816 addressed, verify that we see the failure properly and no later responses - try (var ignored = withRequestTracker(); var nettyClient = new Netty4HttpClient()) { + try (var ignored = withResourceTracker(); var nettyClient = new Netty4HttpClient()) { final var responses = nettyClient.get( randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), YieldsContinuationsPlugin.ROUTE, @@ -245,7 +245,7 @@ public void testContinuationFailure() throws Exception { } public void testClientCancellation() { - try (var ignored = withRequestTracker()) { + try (var ignored = withResourceTracker()) { final var cancellable = getRestClient().performRequestAsync( new Request("GET", InfiniteContinuationsPlugin.ROUTE), new ResponseListener() { @@ -267,55 +267,40 @@ public void onFailure(Exception exception) { } // closing the request tracker ensures that everything is released, including all response chunks and the overall response } - private static Releasable withRequestTracker() { + private static Releasable withResourceTracker() { + assertNull(refs); final var latch = new CountDownLatch(1); - final var refCounted = AbstractRefCounted.of(latch::countDown); - setPluginRequestRefs(refCounted); + refs = AbstractRefCounted.of(latch::countDown); return () -> { - setPluginRequestRefs(RefCounted.ALWAYS_REFERENCED); - refCounted.decRef(); - safeAwait(latch); + refs.decRef(); + try { + safeAwait(latch); + } finally { + refs = null; + } }; } - private static void setPluginRequestRefs(RefCounted refCounted) { - Iterators.flatMap( - internalCluster().getInstances(PluginsService.class).iterator(), - pluginsService -> pluginsService.filterPlugins(HasRequestRefs.class).iterator() - ).forEachRemaining(p -> p.setRequestRefs(refCounted)); - } - - interface HasRequestRefs { - void setRequestRefs(RefCounted requestRefs); - } + private static volatile RefCounted refs = null; /** * Adds a REST route which yields a sequence of continuations which are computed asynchronously, effectively pausing after each one.. */ - public static class YieldsContinuationsPlugin extends Plugin implements ActionPlugin, HasRequestRefs { + public static class YieldsContinuationsPlugin extends Plugin implements ActionPlugin { static final String ROUTE = "/_test/yields_continuations"; static final String FAIL_INDEX_PARAM = "fail_index"; private static final ActionType TYPE = new ActionType<>("test:yields_continuations"); - private RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; - @Override public Collection> getActions() { return List.of(new ActionHandler<>(TYPE, TransportYieldsContinuationsAction.class)); } - @Override - public void setRequestRefs(RefCounted requestRefs) { - this.requestRefs = requestRefs; - } - public static class Request extends ActionRequest { - final RefCounted requestRefs; final int failIndex; - public Request(RefCounted requestRefs, int failIndex) { - this.requestRefs = requestRefs; + public Request(int failIndex) { this.failIndex = failIndex; } @@ -328,12 +313,10 @@ public ActionRequestValidationException validate() { public static class Response extends ActionResponse { private final int failIndex; private final Executor executor; - private final RefCounted requestRefs; - public Response(int failIndex, Executor executor, RefCounted requestRefs) { + public Response(int failIndex, Executor executor) { this.failIndex = failIndex; this.executor = executor; - this.requestRefs = requestRefs; } @Override @@ -371,19 +354,19 @@ public void getContinuation(ActionListener listener) { @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) throws IOException { assertTrue(lines.hasNext()); - requestRefs.mustIncRef(); + refs.mustIncRef(); final var output = new RecyclerBytesStreamOutput(recycler); boolean success = false; try { try (var writer = new OutputStreamWriter(Streams.flushOnCloseStream(output), StandardCharsets.UTF_8)) { writer.write(lines.next()); } - final var result = new ReleasableBytesReference(output.bytes(), Releasables.wrap(output, requestRefs::decRef)); + final var result = new ReleasableBytesReference(output.bytes(), Releasables.wrap(output, refs::decRef)); success = true; return result; } finally { if (success == false) { - requestRefs.decRef(); + refs.decRef(); output.close(); } } @@ -409,7 +392,7 @@ public TransportYieldsContinuationsAction(ActionFilters actionFilters, Transport @Override protected void doExecute(Task task, Request request, ActionListener listener) { - executor.execute(ActionRunnable.supply(listener, () -> new Response(request.failIndex, executor, request.requestRefs))); + executor.execute(ActionRunnable.supply(listener, () -> new Response(request.failIndex, executor))); } } @@ -439,24 +422,21 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { final var failIndex = request.paramAsInt(FAIL_INDEX_PARAM, Integer.MAX_VALUE); - final var localRequestRefs = requestRefs; - localRequestRefs.mustIncRef(); + refs.mustIncRef(); return new RestChannelConsumer() { @Override public void close() { - localRequestRefs.decRef(); + refs.decRef(); } @Override public void accept(RestChannel channel) { - localRequestRefs.mustIncRef(); - client.execute(TYPE, new Request(localRequestRefs, failIndex), new RestActionListener<>(channel) { + refs.mustIncRef(); + client.execute(TYPE, new Request(failIndex), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { - channel.sendResponse( - RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), localRequestRefs::decRef) - ); + channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), refs::decRef)); } }); } @@ -469,30 +449,17 @@ protected void processResponse(Response response) { /** * Adds a REST route which yields an infinite sequence of continuations which can only be stopped by the client closing the connection. */ - public static class InfiniteContinuationsPlugin extends Plugin implements ActionPlugin, HasRequestRefs { + public static class InfiniteContinuationsPlugin extends Plugin implements ActionPlugin { static final String ROUTE = "/_test/infinite_continuations"; private static final ActionType TYPE = new ActionType<>("test:infinite_continuations"); - private RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; - @Override public Collection> getActions() { return List.of(new ActionHandler<>(TYPE, TransportInfiniteContinuationsAction.class)); } - @Override - public void setRequestRefs(RefCounted requestRefs) { - this.requestRefs = requestRefs; - } - public static class Request extends ActionRequest { - final RefCounted requestRefs; - - public Request(RefCounted requestRefs) { - this.requestRefs = requestRefs; - } - @Override public ActionRequestValidationException validate() { return null; @@ -501,11 +468,9 @@ public ActionRequestValidationException validate() { public static class Response extends ActionResponse { private final Executor executor; - private final RefCounted requestRefs; - public Response(Executor executor, RefCounted requestRefs) { + public Response(Executor executor) { this.executor = executor; - this.requestRefs = requestRefs; } @Override @@ -535,8 +500,8 @@ public void getContinuation(ActionListener listener) { @Override public ReleasableBytesReference encodeChunk(int sizeHint, Recycler recycler) { assertTrue(lines.hasNext()); - requestRefs.mustIncRef(); - return new ReleasableBytesReference(new BytesArray(lines.next()), requestRefs::decRef); + refs.mustIncRef(); + return new ReleasableBytesReference(new BytesArray(lines.next()), refs::decRef); } @Override @@ -559,10 +524,7 @@ public TransportInfiniteContinuationsAction(ActionFilters actionFilters, Transpo @Override protected void doExecute(Task task, Request request, ActionListener listener) { executor.execute( - ActionRunnable.supply( - ActionTestUtils.assertNoFailureListener(listener::onResponse), - () -> new Response(executor, request.requestRefs) - ) + ActionRunnable.supply(ActionTestUtils.assertNoFailureListener(listener::onResponse), () -> new Response(executor)) ); } } @@ -592,27 +554,28 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - final var localRequestRefs = requestRefs; - localRequestRefs.mustIncRef(); - return new RestChannelConsumer() { - @Override - public void close() { - localRequestRefs.decRef(); - } + final var localRefs = refs; // single volatile read + if (localRefs != null && localRefs.tryIncRef()) { + return new RestChannelConsumer() { + @Override + public void close() { + refs.decRef(); + } - @Override - public void accept(RestChannel channel) { - localRequestRefs.mustIncRef(); - client.execute(TYPE, new Request(localRequestRefs), new RestActionListener<>(channel) { - @Override - protected void processResponse(Response response) { - channel.sendResponse( - RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), localRequestRefs::decRef) - ); - } - }); - } - }; + @Override + public void accept(RestChannel channel) { + refs.mustIncRef(); + client.execute(TYPE, new Request(), new RestActionListener<>(channel) { + @Override + protected void processResponse(Response response) { + channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), refs::decRef)); + } + }); + } + }; + } else { + throw new TaskCancelledException("request cancelled"); + } } }); } @@ -621,17 +584,10 @@ protected void processResponse(Response response) { /** * Adds an HTTP route that waits for 3 concurrent executions before returning any of them */ - public static class CountDown3Plugin extends Plugin implements ActionPlugin, HasRequestRefs { + public static class CountDown3Plugin extends Plugin implements ActionPlugin { static final String ROUTE = "/_test/countdown_3"; - private RefCounted requestRefs = RefCounted.ALWAYS_REFERENCED; - - @Override - public void setRequestRefs(RefCounted requestRefs) { - this.requestRefs = requestRefs; - } - @Override public Collection getRestHandlers( Settings settings, @@ -668,18 +624,18 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { - requestRefs.mustIncRef(); + refs.mustIncRef(); return new RestChannelConsumer() { @Override public void close() { - requestRefs.decRef(); + refs.decRef(); } @Override public void accept(RestChannel channel) { - requestRefs.mustIncRef(); - addListener(ActionListener.releaseAfter(new RestToXContentListener<>(channel), requestRefs::decRef)); + refs.mustIncRef(); + addListener(ActionListener.releaseAfter(new RestToXContentListener<>(channel), refs::decRef)); } }; } From bbcbf5ac65b039ec244079c5d1a9e93cf0f670d0 Mon Sep 17 00:00:00 2001 From: David Turner Date: Fri, 9 Feb 2024 13:23:47 +0000 Subject: [PATCH 12/25] Adjust tests to show cancellation behaviour & fix comment to match --- .../http/netty4/Netty4ChunkedContinuationsIT.java | 14 ++++++++++++-- .../rest/ChunkedRestResponseBody.java | 5 +++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 659a18f498151..b846cee9b7631 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -468,6 +468,7 @@ public ActionRequestValidationException validate() { public static class Response extends ActionResponse { private final Executor executor; + volatile boolean computingContinuation; public Response(Executor executor) { this.executor = executor; @@ -494,7 +495,11 @@ public boolean isEndOfResponse() { @Override public void getContinuation(ActionListener listener) { - executor.execute(ActionRunnable.supply(listener, () -> getChunkedBody())); + computingContinuation = true; + executor.execute(ActionRunnable.supply(listener, () -> { + computingContinuation = false; + return getChunkedBody(); + })); } @Override @@ -568,7 +573,12 @@ public void accept(RestChannel channel) { client.execute(TYPE, new Request(), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { - channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), refs::decRef)); + channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), () -> { + // cancellation notification only happens while processing a continuation, not while computing + // the next one; prompt cancellation requires use of something like RestCancellableNodeClient + assertFalse(response.computingContinuation); + refs.decRef(); + })); } }); } diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index a797c5fb257bc..215af875b25e0 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -60,8 +60,9 @@ public interface ChunkedRestResponseBody { *

Note that the {@link Task} corresponding to any invocation of {@link Client#execute} completes as soon as the client action * returns its response, so it no longer exists when this method is called and cannot be used to receive cancellation notifications. - * Instead, if the HTTP channel is closed while sending a response (including while waiting for a continuation) then the REST layer - * will invoke {@link RestResponse#close}. Implementations will typically explicitly create a {@link CancellableTask} to represent the + * Instead, if the HTTP channel is closed while sending a response then the REST layer will invoke {@link RestResponse#close}. If the + * HTTP channel is closed while the REST layer is waiting for a continuation then the {@link RestResponse} will not be closed until the + * continuation listener is completed. Implementations will typically explicitly create a {@link CancellableTask} to represent the * computation and transmission of the entire {@link RestResponse}, and will cancel this task if the {@link RestResponse} is closed * prematurely.

* From 0b91c7f85835f0fd90b01faef283f511ecdfca45 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Feb 2024 08:20:00 +0000 Subject: [PATCH 13/25] Complete listener even on impossible path --- .../java/org/elasticsearch/rest/ChunkedRestResponseBody.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 215af875b25e0..75f37aeca4231 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -139,6 +139,7 @@ public boolean isEndOfResponse() { @Override public void getContinuation(ActionListener listener) { assert false : "no continuations"; + listener.onFailure(new IllegalStateException("no continuations available")); } @Override @@ -227,6 +228,7 @@ public boolean isEndOfResponse() { @Override public void getContinuation(ActionListener listener) { assert false : "no continuations"; + listener.onFailure(new IllegalStateException("no continuations available")); } @Override From d5e4031ea2f9c1f84a15d9119131152814eda69f Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Feb 2024 08:20:20 +0000 Subject: [PATCH 14/25] Assertions to catch invalid usage --- .../elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java | 1 + server/src/main/java/org/elasticsearch/rest/RestResponse.java | 1 + .../src/main/java/org/elasticsearch/rest/RestResponseUtils.java | 1 + 3 files changed, 3 insertions(+) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index e15def622937b..79fd756d10d1f 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -247,6 +247,7 @@ private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4Chunked final PromiseCombiner combiner = continuation.combiner(); assert currentChunkedWrite == null; final var responseBody = continuation.body(); + assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part"; currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody); // NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel. while (ctx.channel().isWritable()) { diff --git a/server/src/main/java/org/elasticsearch/rest/RestResponse.java b/server/src/main/java/org/elasticsearch/rest/RestResponse.java index a4a44a5a65561..9862ab31bd53f 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestResponse.java +++ b/server/src/main/java/org/elasticsearch/rest/RestResponse.java @@ -86,6 +86,7 @@ private RestResponse(RestStatus status, String responseMediaType, BytesReference public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content, @Nullable Releasable releasable) { if (content.isDone()) { + assert content.isEndOfResponse() : "response with continuations must have at least one (possibly-empty) chunk in each part"; return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY, releasable); } else { return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content, releasable); diff --git a/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java b/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java index dfbd7266cc4a2..1b1331fe25bbf 100644 --- a/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/rest/RestResponseUtils.java @@ -42,6 +42,7 @@ public static BytesReference getBodyContent(RestResponse restResponse) { chunk.writeTo(out); } } + assert chunkedRestResponseBody.isEndOfResponse() : "RestResponseUtils#getBodyContent does not support continuations (yet)"; out.flush(); return out.bytes(); From 751ce757272a740b072cb70945a6e006b60165ae Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 12 Feb 2024 08:28:04 +0000 Subject: [PATCH 15/25] isDone Javadoc --- .../java/org/elasticsearch/rest/ChunkedRestResponseBody.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 75f37aeca4231..6921e1a1bfe0c 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -43,7 +43,8 @@ public interface ChunkedRestResponseBody { Logger logger = LogManager.getLogger(ChunkedRestResponseBody.class); /** - * @return true once this response has been written fully. + * @return {@code true} if this body contains no more chunks and the REST layer should check for a possible continuation by calling + * {@link #isEndOfResponse}, or {@code false} if the REST layer should request another chunk from this body using {@link #encodeChunk}. */ boolean isDone(); From 38d55afdab0ba68a5b6ef47df34a59f1da8dc6aa Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 14 Feb 2024 08:58:26 +0000 Subject: [PATCH 16/25] scaledRandomIntBetween --- .../elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index b846cee9b7631..137a47b2128a2 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -261,7 +261,7 @@ public void onFailure(Exception exception) { } ); if (randomBoolean()) { - safeSleep(10); // make it more likely the request started executing + safeSleep(scaledRandomIntBetween(10, 500)); // make it more likely the request started executing } cancellable.cancel(); } // closing the request tracker ensures that everything is released, including all response chunks and the overall response From 048194db905085def673003174addf939c8efd43 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 14 Feb 2024 08:58:30 +0000 Subject: [PATCH 17/25] localRefs --- .../http/netty4/Netty4ChunkedContinuationsIT.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 137a47b2128a2..467ebf9a7f712 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -564,12 +564,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return new RestChannelConsumer() { @Override public void close() { - refs.decRef(); + localRefs.decRef(); } @Override public void accept(RestChannel channel) { - refs.mustIncRef(); + localRefs.mustIncRef(); client.execute(TYPE, new Request(), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { @@ -577,7 +577,8 @@ protected void processResponse(Response response) { // cancellation notification only happens while processing a continuation, not while computing // the next one; prompt cancellation requires use of something like RestCancellableNodeClient assertFalse(response.computingContinuation); - refs.decRef(); + assertSame(localRefs, refs); + localRefs.decRef(); })); } }); From 6fac79de44bf4370837c76442b48e2b404beb5f6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 14 Feb 2024 09:42:50 +0000 Subject: [PATCH 18/25] Fix netty/netty#8007 leak --- .../http/netty4/Netty4HttpPipeliningHandler.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 79fd756d10d1f..5b29f18f2d9fa 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -276,6 +276,12 @@ public void onResponse(ChunkedRestResponseBody continuation) { new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), finishingWrite.onDone() // pass the terminal listener/promise along the line ); + if (channel.eventLoop().isShuttingDown()) { + channel.eventLoop().terminationFuture().addListener(ignored -> finishingWrite.onDone().trySuccess()); + // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the + // preceding writeAndFlush made it onto its queue before shutdown or whether it will just vanish without a trace, so + // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated. + } } @Override From e4a34ac620772139185da8bdbaab2d611db5f015 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 21 Feb 2024 12:17:29 +0000 Subject: [PATCH 19/25] Fix up tests --- .../netty4/Netty4ChunkedContinuationsIT.java | 52 +++++++++++++++---- .../http/netty4/Netty4HttpClient.java | 2 +- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 467ebf9a7f712..d6dfa55621f86 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -92,7 +92,9 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestResponse.TEXT_CONTENT_TYPE; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; public class Netty4ChunkedContinuationsIT extends ESNetty4IntegTestCase { @@ -227,20 +229,39 @@ public void testPipelining() throws Exception { } public void testContinuationFailure() throws Exception { - // TODO when https://github.com/netty/netty/issues/13816 addressed, verify that we see the failure properly and no later responses try (var ignored = withResourceTracker(); var nettyClient = new Netty4HttpClient()) { + final var failIndex = between(0, 2); final var responses = nettyClient.get( randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address(), YieldsContinuationsPlugin.ROUTE, - YieldsContinuationsPlugin.ROUTE + "?" + YieldsContinuationsPlugin.FAIL_INDEX_PARAM + "=1" + YieldsContinuationsPlugin.ROUTE + "?" + YieldsContinuationsPlugin.FAIL_INDEX_PARAM + "=" + failIndex ); - assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); - assertEquals(""" - batch-0-chunk-0 - batch-0-chunk-1 - batch-0-chunk-2 - """, Netty4Utils.toBytesReference(responses.get(1).content()).utf8ToString()); + if (failIndex == 0) { + assertThat( + responses, + anyOf( + // might get a 500 response if the failure is early enough + hasSize(2), + // might get no response before channel closed + hasSize(1), + // might even close the channel before flushing the previous response + hasSize(0) + ) + ); + + if (responses.size() == 2) { + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(500, responses.get(1).status().code()); + } + } else { + assertThat(responses, hasSize(1)); + } + + if (responses.size() > 0) { + assertEquals(expectedBody, Netty4Utils.toBytesReference(responses.get(0).content()).utf8ToString()); + assertEquals(200, responses.get(0).status().code()); + } } } @@ -329,8 +350,8 @@ public ChunkedRestResponseBody getChunkedBody() { } private ChunkedRestResponseBody getChunkBatch(int batchIndex) { - if (batchIndex == failIndex) { - throw new ElasticsearchException("simulated failure"); + if (batchIndex == failIndex && randomBoolean()) { + throw new ElasticsearchException("simulated failure creating next batch"); } return new ChunkedRestResponseBody() { @@ -362,6 +383,9 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec writer.write(lines.next()); } final var result = new ReleasableBytesReference(output.bytes(), Releasables.wrap(output, refs::decRef)); + if (batchIndex == failIndex) { + throw new ElasticsearchException("simulated failure encoding chunk"); + } success = true; return result; } finally { @@ -436,7 +460,13 @@ public void accept(RestChannel channel) { client.execute(TYPE, new Request(failIndex), new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { - channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), refs::decRef)); + try { + final var responseBody = response.getChunkedBody(); // might fail, so do this before acquiring ref + refs.mustIncRef(); + channel.sendResponse(RestResponse.chunked(RestStatus.OK, responseBody, refs::decRef)); + } finally { + refs.decRef(); + } } }); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java index 71748457d2956..5ab54e48b34a4 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpClient.java @@ -168,7 +168,7 @@ private synchronized List sendRequests(final SocketAddress rem @Override public void close() { - clientBootstrap.config().group().shutdownGracefully().awaitUninterruptibly(); + clientBootstrap.config().group().shutdownGracefully(0L, 0L, TimeUnit.SECONDS).awaitUninterruptibly(); } /** From b3b7399605ea2725d28aa4ed8e03041a142bba18 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 21 Feb 2024 12:36:51 +0000 Subject: [PATCH 20/25] Better shutdown protection --- .../netty4/Netty4HttpPipeliningHandler.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index ed8ec65757e44..2335f7dc12f4d 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -261,10 +261,11 @@ private void finishChunkedWrite() { // failure during chunked response serialization, we're closing the channel return; } - assert currentChunkedWrite.responseBody().isDone(); final var finishingWrite = currentChunkedWrite; - final var endOfResponse = finishingWrite.responseBody().isEndOfResponse(); currentChunkedWrite = null; + final var finishingWriteBody = finishingWrite.responseBody(); + assert finishingWriteBody.isDone(); + final var endOfResponse = finishingWriteBody.isEndOfResponse(); if (endOfResponse) { writeSequence++; finishingWrite.combiner().finish(finishingWrite.onDone()); @@ -277,12 +278,7 @@ public void onResponse(ChunkedRestResponseBody continuation) { new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), finishingWrite.onDone() // pass the terminal listener/promise along the line ); - if (channel.eventLoop().isShuttingDown()) { - channel.eventLoop().terminationFuture().addListener(ignored -> finishingWrite.onDone().trySuccess()); - // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the - // preceding writeAndFlush made it onto its queue before shutdown or whether it will just vanish without a trace, so - // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated. - } + checkShutdown(); } @Override @@ -295,8 +291,24 @@ public void onFailure(Exception e) { finishingWrite.combiner().add(channel.newFailedFuture(e)); finishingWrite.combiner().finish(finishingWrite.onDone()); }); + checkShutdown(); } - }), finishingWrite.responseBody()::getContinuation); + + private void checkShutdown() { + if (channel.eventLoop().isShuttingDown()) { + // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the + // preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so + // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated. + // Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor + // which means this completion is not subject to the same issue, it still works even if the event loop has already + // terminated. + channel.eventLoop() + .terminationFuture() + .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException())); + } + } + + }), finishingWriteBody::getContinuation); } } From 23c1a1fb415dab903870c1f1e1d4691472ddffb2 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 20 May 2024 15:28:50 +0100 Subject: [PATCH 21/25] Fixup --- .../netty4/Netty4ChunkedContinuationsIT.java | 81 ++++++++++++------- .../elasticsearch/http/HttpBodyTracer.java | 2 +- 2 files changed, 54 insertions(+), 29 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index d6dfa55621f86..25195a1176fb8 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -46,11 +46,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.http.HttpBodyTracer; import org.elasticsearch.http.HttpRouteStats; import org.elasticsearch.http.HttpRouteStatsTracker; import org.elasticsearch.http.HttpServerTransport; @@ -68,7 +71,7 @@ import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; -import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.MockLog; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -79,6 +82,7 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -86,6 +90,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -147,37 +152,57 @@ public void testTraceLogging() { // slightly awkward test, we can't use ChunkedLoggingStreamTestUtils.getDecodedLoggedBody directly because it asserts that we _only_ // log one thing and we can't easily separate the request body from the response body logging, so instead we capture the body log // message and then log it again with a different logger. + final var resources = new ArrayList(); + try (var ignored = Releasables.wrap(resources)) { + resources.add(withResourceTracker()); + final var executor = EsExecutors.newFixed( + "test", + 1, + -1, + EsExecutors.daemonThreadFactory(Settings.EMPTY, "test"), + new ThreadContext(Settings.EMPTY), + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ); + resources.add(() -> assertTrue(ThreadPool.terminate(executor, 10, TimeUnit.SECONDS))); + var loggingFinishedLatch = new CountDownLatch(1); + MockLog.assertThatLogger( + () -> assertEquals( + expectedBody, + ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( + logger, + Level.INFO, + "response body", + ReferenceDocs.HTTP_TRACER, + () -> { + final var request = new Request("GET", YieldsContinuationsPlugin.ROUTE); + request.addParameter("error_trace", "true"); + getRestClient().performRequest(request); + safeAwait(loggingFinishedLatch); + } + ).utf8ToString() + ), + HttpBodyTracer.class, + new MockLog.LoggingExpectation() { + final Pattern messagePattern = Pattern.compile("^\\[[1-9][0-9]*] (response body.*)"); - var loggingFinishedLatch = new CountDownLatch(1); - var mockLogAppender = new MockLogAppender(); - mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { - final Pattern messagePattern = Pattern.compile("^\\[[1-9][0-9]*] (response body.*)"); - - @Override - public void match(LogEvent event) { - final var formattedMessage = event.getMessage().getFormattedMessage(); - final var matcher = messagePattern.matcher(formattedMessage); - if (matcher.matches()) { - logger.info("{}", matcher.group(1)); - if (formattedMessage.contains(ReferenceDocs.HTTP_TRACER.toString())) { - loggingFinishedLatch.countDown(); + @Override + public void match(LogEvent event) { + final var formattedMessage = event.getMessage().getFormattedMessage(); + final var matcher = messagePattern.matcher(formattedMessage); + if (matcher.matches()) { + executor.execute(() -> { + logger.info("{}", matcher.group(1)); + if (formattedMessage.contains(ReferenceDocs.HTTP_TRACER.toString())) { + loggingFinishedLatch.countDown(); + } + }); + } } - } - } - @Override - public void assertMatched() {} - }); - - try (var ignored = withResourceTracker(); var ignored2 = mockLogAppender.capturing("org.elasticsearch.http.HttpBodyTracer")) { - assertEquals( - expectedBody, - ChunkedLoggingStreamTestUtils.getDecodedLoggedBody(logger, Level.INFO, "response body", ReferenceDocs.HTTP_TRACER, () -> { - getRestClient().performRequest(new Request("GET", YieldsContinuationsPlugin.ROUTE)); - safeAwait(loggingFinishedLatch); - }).utf8ToString() + @Override + public void assertMatched() {} + } ); - mockLogAppender.assertAllExpectationsMatched(); } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java b/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java index 1773a4803f62a..1dd2868f7bfa6 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java +++ b/server/src/main/java/org/elasticsearch/http/HttpBodyTracer.java @@ -17,7 +17,7 @@ import java.io.OutputStream; -class HttpBodyTracer { +public class HttpBodyTracer { private static final Logger logger = LogManager.getLogger(HttpBodyTracer.class); public static boolean isEnabled() { From 4570c922b1ad020af82db265c2c63fb2b6d78588 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 25 May 2024 10:03:16 +0100 Subject: [PATCH 22/25] Javadocs --- .../elasticsearch/rest/ChunkedRestResponseBody.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java index 6921e1a1bfe0c..2f7fc458ca020 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBody.java @@ -35,8 +35,15 @@ import java.util.Iterator; /** - * The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and - * instead serialize only as much of the response as can be flushed to the network right away. + *

A body (or a part thereof) of an HTTP response that uses the {@code chunked} transfer-encoding. This allows Elasticsearch to avoid + * materializing the full response into on-heap buffers up front, instead serializing only as much of the response as can be flushed to the + * network right away.

+ * + *

Each {@link ChunkedRestResponseBody} represents a sequence of chunks that are ready for immediate transmission: if {@link #isDone} + * returns {@code false} then {@link #encodeChunk} can be called at any time and must synchronously return the next chunk to be sent. + * Many HTTP responses will be a single such sequence. However, if an implementation's {@link #isEndOfResponse} returns {@code false} at the + * end of the sequence then the transmission is paused and {@link #getContinuation} is called to compute the next sequence of chunks + * asynchronously.

*/ public interface ChunkedRestResponseBody { From 0d923f42bbe7ee3e84b702927785094f25d28f3d Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 25 May 2024 10:03:22 +0100 Subject: [PATCH 23/25] Assert body done in test --- .../org/elasticsearch/http/DefaultRestChannelTests.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 6c128560fc830..f8562033b9725 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -781,6 +781,7 @@ public String getResponseContentTypeString() { } final var isClosed = new AtomicBoolean(); + final var testBody = new TestBody(responseBody, between(0, 3)); assertEquals( responseBody, ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( @@ -791,8 +792,12 @@ public String getResponseContentTypeString() { () -> channel.sendResponse( RestResponse.chunked( RestStatus.OK, - new TestBody(responseBody, between(0, 3)), - () -> assertTrue(isClosed.compareAndSet(false, true)) + testBody, + () -> { + assertTrue(isClosed.compareAndSet(false, true)); + assertTrue(testBody.isDone()); + assertTrue(testBody.isEndOfResponse()); + } ) ) ) From ee1a4a46b5a7b514b38722d310d90b861f601193 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 25 May 2024 10:18:45 +0100 Subject: [PATCH 24/25] Revert "Assert body done in test" This reverts commit 0d923f42bbe7ee3e84b702927785094f25d28f3d. --- .../org/elasticsearch/http/DefaultRestChannelTests.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index f8562033b9725..6c128560fc830 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -781,7 +781,6 @@ public String getResponseContentTypeString() { } final var isClosed = new AtomicBoolean(); - final var testBody = new TestBody(responseBody, between(0, 3)); assertEquals( responseBody, ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( @@ -792,12 +791,8 @@ public String getResponseContentTypeString() { () -> channel.sendResponse( RestResponse.chunked( RestStatus.OK, - testBody, - () -> { - assertTrue(isClosed.compareAndSet(false, true)); - assertTrue(testBody.isDone()); - assertTrue(testBody.isEndOfResponse()); - } + new TestBody(responseBody, between(0, 3)), + () -> assertTrue(isClosed.compareAndSet(false, true)) ) ) ) From a2c385ea17fc00728cd8002a6224ca245263e0d6 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sat, 25 May 2024 10:30:27 +0100 Subject: [PATCH 25/25] Capture parts & assert complete --- .../http/DefaultRestChannelTests.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 6c128560fc830..f12d8ea5c631a 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -56,6 +56,7 @@ import java.net.InetSocketAddress; import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -734,6 +735,7 @@ private static void writeContent(OutputStream bso, ChunkedRestResponseBody conte ) ); + final var parts = new ArrayList(); class TestBody implements ChunkedRestResponseBody { boolean isDone; final BytesReference thisChunk; @@ -764,7 +766,9 @@ public boolean isEndOfResponse() { @Override public void getContinuation(ActionListener listener) { - listener.onResponse(new TestBody(remainingChunks, remainingContinuations - 1)); + final var continuation = new TestBody(remainingChunks, remainingContinuations - 1); + parts.add(continuation); + listener.onResponse(continuation); } @Override @@ -781,6 +785,8 @@ public String getResponseContentTypeString() { } final var isClosed = new AtomicBoolean(); + final var firstPart = new TestBody(responseBody, between(0, 3)); + parts.add(firstPart); assertEquals( responseBody, ChunkedLoggingStreamTestUtils.getDecodedLoggedBody( @@ -788,13 +794,13 @@ public String getResponseContentTypeString() { Level.TRACE, "[" + request.getRequestId() + "] response body", ReferenceDocs.HTTP_TRACER, - () -> channel.sendResponse( - RestResponse.chunked( - RestStatus.OK, - new TestBody(responseBody, between(0, 3)), - () -> assertTrue(isClosed.compareAndSet(false, true)) - ) - ) + () -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, firstPart, () -> { + assertTrue(isClosed.compareAndSet(false, true)); + for (int i = 0; i < parts.size(); i++) { + assertTrue("isDone " + i, parts.get(i).isDone()); + assertEquals("isEndOfResponse " + i, i == parts.size() - 1, parts.get(i).isEndOfResponse()); + } + })) ) );