diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java new file mode 100644 index 0000000000000..4de7ca97ed51b --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -0,0 +1,436 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.LastHttpContent; + +import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.netty4.Netty4Utils; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON; +import static io.netty.handler.codec.http.HttpMethod.POST; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +@ESIntegTestCase.ClusterScope(numDataNodes = 1) +public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { + + // ensure empty http content has single 0 size chunk + public void testEmptyContent() throws Exception { + try (var ctx = setupClientCtx()) { + var totalRequests = randomIntBetween(1, 10); + for (int reqNo = 0; reqNo < totalRequests; reqNo++) { + var opaqueId = opaqueId(reqNo); + + // send request with empty content + ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, Unpooled.EMPTY_BUFFER)); + var handler = ctx.awaitRestChannelAccepted(opaqueId); + handler.stream.next(); + + // should receive a single empty chunk + var recvChunk = safePoll(handler.recvChunks); + assertTrue(recvChunk.isLast); + assertEquals(0, recvChunk.chunk.length()); + recvChunk.chunk.close(); + + // send response to process following request + handler.sendResponse(new RestResponse(RestStatus.OK, "")); + } + assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size())); + } + } + + // ensures content integrity, no loses and re-order + public void testReceiveAllChunks() throws Exception { + try (var ctx = setupClientCtx()) { + var totalRequests = randomIntBetween(1, 10); + for (int reqNo = 0; reqNo < totalRequests; reqNo++) { + var opaqueId = opaqueId(reqNo); + + // this dataset will be compared with one on server side + var dataSize = randomIntBetween(1024, 10 * 1024 * 1024); + var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize)); + sendData.retain(); + ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData)); + + var handler = ctx.awaitRestChannelAccepted(opaqueId); + + var recvData = Unpooled.buffer(dataSize); + while (true) { + handler.stream.next(); + var recvChunk = safePoll(handler.recvChunks); + try (recvChunk.chunk) { + recvData.writeBytes(Netty4Utils.toByteBuf(recvChunk.chunk)); + if (recvChunk.isLast) { + break; + } + } + } + + assertEquals("sent and received payloads are not the same", sendData, recvData); + handler.sendResponse(new RestResponse(RestStatus.OK, "")); + } + assertBusy(() -> assertEquals("should receive all server responses", totalRequests, ctx.clientRespQueue.size())); + } + } + + // ensures that all queued chunks are released when connection closed + public void testClientConnectionCloseMidStream() throws Exception { + try (var ctx = setupClientCtx()) { + var opaqueId = opaqueId(0); + + // write half of http request + ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024)); + ctx.clientChannel.writeAndFlush(randomContent(1024, false)); + + // await stream handler is ready and request full content + var handler = ctx.awaitRestChannelAccepted(opaqueId); + assertBusy(() -> assertEquals(1, handler.stream.chunkQueue().size())); + + // enable auto-read to receive channel close event + handler.stream.channel().config().setAutoRead(true); + + // terminate connection and wait resources are released + ctx.clientChannel.close(); + assertBusy(() -> assertEquals(0, handler.stream.chunkQueue().size())); + } + } + + // ensures that all queued chunks are released when server decides to close connection + public void testServerCloseConnectionMidStream() throws Exception { + try (var ctx = setupClientCtx()) { + var opaqueId = opaqueId(0); + + // write half of http request + ctx.clientChannel.write(httpRequest(opaqueId, 2 * 1024)); + ctx.clientChannel.writeAndFlush(randomContent(1024, false)); + + // await stream handler is ready and request full content + var handler = ctx.awaitRestChannelAccepted(opaqueId); + assertBusy(() -> assertEquals(1, handler.stream.chunkQueue().size())); + + // terminate connection on server and wait resources are released + handler.channel.request().getHttpChannel().close(); + assertBusy(() -> assertEquals(0, handler.stream.chunkQueue().size())); + } + } + + // ensure that client's socket buffers data when server is not consuming data + public void testClientBackpressure() throws Exception { + try (var ctx = setupClientCtx()) { + var opaqueId = opaqueId(0); + var payloadSize = MBytes(50); + ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize)); + for (int i = 0; i < 5; i++) { + ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false)); + } + assertFalse( + "should not flush last content immediately", + ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).isDone() + ); + + var handler = ctx.awaitRestChannelAccepted(opaqueId); + + // Read buffers for socket and channel usually within few MBytes range all together. + // This test assumes that buffers will not exceed 10 MBytes, in other words there should + // be less than 10 MBytes in fly between http client's socket and rest handler. This + // loop ensures that reading 10 MBytes of content on server side should free almost + // same size in client's channel write buffer. + for (int mb = 0; mb <= 50; mb += 10) { + var minBufSize = payloadSize - MBytes(10 + mb); + var maxBufSize = payloadSize - MBytes(mb); + // it is hard to tell that client's channel is no logger flushing data + // it might take a few busy-iterations before channel buffer flush to kernel + // and bytesBeforeWritable will stop changing + assertBusy(() -> { + var bufSize = ctx.clientChannel.bytesBeforeWritable(); + assertTrue( + "client's channel buffer should be in range [" + minBufSize + "," + maxBufSize + "], got " + bufSize, + bufSize >= minBufSize && bufSize <= maxBufSize + ); + }); + handler.consumeBytes(MBytes(10)); + } + assertTrue(handler.stream.hasLast()); + } + } + + private String opaqueId(int reqNo) { + return getTestName() + "-" + reqNo; + } + + static int MBytes(int m) { + return m * 1024 * 1024; + } + + static T safePoll(BlockingDeque queue) { + try { + var t = queue.poll(SAFE_AWAIT_TIMEOUT.seconds(), TimeUnit.SECONDS); + assertNotNull("queue is empty", t); + return t; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError(e); + } + } + + static FullHttpRequest fullHttpRequest(String opaqueId, ByteBuf content) { + var req = new DefaultFullHttpRequest(HTTP_1_1, POST, ControlServerRequestPlugin.ROUTE, Unpooled.wrappedBuffer(content)); + req.headers().add(CONTENT_LENGTH, content.readableBytes()); + req.headers().add(CONTENT_TYPE, APPLICATION_JSON); + req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); + return req; + } + + static HttpRequest httpRequest(String opaqueId, int contentLength) { + return httpRequest(ControlServerRequestPlugin.ROUTE, opaqueId, contentLength); + } + + static HttpRequest httpRequest(String uri, String opaqueId, int contentLength) { + var req = new DefaultHttpRequest(HTTP_1_1, POST, uri); + req.headers().add(CONTENT_LENGTH, contentLength); + req.headers().add(CONTENT_TYPE, APPLICATION_JSON); + req.headers().add(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId); + return req; + } + + static HttpContent randomContent(int size, boolean isLast) { + var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + if (isLast) { + return new DefaultLastHttpContent(buf); + } else { + return new DefaultHttpContent(buf); + } + } + + Ctx setupClientCtx() throws Exception { + var nodeName = internalCluster().getRandomNodeName(); + var clientRespQueue = new LinkedBlockingDeque<>(16); + var bootstrap = bootstrapClient(nodeName, clientRespQueue); + var channel = bootstrap.connect().sync().channel(); + return new Ctx(getTestName(), nodeName, bootstrap, channel, clientRespQueue); + } + + Bootstrap bootstrapClient(String node, BlockingQueue queue) { + var httpServer = internalCluster().getInstance(HttpServerTransport.class, node); + var remoteAddr = randomFrom(httpServer.boundAddress().boundAddresses()); + return new Bootstrap().group(new NioEventLoopGroup(1)) + .channel(NioSocketChannel.class) + .remoteAddress(remoteAddr.getAddress(), remoteAddr.getPort()) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + var p = ch.pipeline(); + p.addLast(new HttpClientCodec()); + p.addLast(new HttpObjectAggregator(4096)); + p.addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) { + msg.retain(); + queue.add(msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + queue.add(cause); + } + }); + } + }); + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.concatLists(List.of(ControlServerRequestPlugin.class), super.nodePlugins()); + } + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel clientChannel, BlockingDeque clientRespQueue) + implements + AutoCloseable { + + @Override + public void close() throws Exception { + safeGet(clientChannel.close()); + safeGet(clientBootstrap.config().group().shutdownGracefully()); + clientRespQueue.forEach(o -> { if (o instanceof FullHttpResponse resp) resp.release(); }); + for (var opaqueId : ControlServerRequestPlugin.handlers.keySet()) { + if (opaqueId.startsWith(testName)) { + var handler = ControlServerRequestPlugin.handlers.get(opaqueId); + handler.recvChunks.forEach(c -> c.chunk.close()); + handler.channel.request().getHttpChannel().close(); + ControlServerRequestPlugin.handlers.remove(opaqueId); + } + } + } + + ServerRequestHandler awaitRestChannelAccepted(String opaqueId) throws Exception { + assertBusy(() -> assertTrue(ControlServerRequestPlugin.handlers.containsKey(opaqueId))); + var handler = ControlServerRequestPlugin.handlers.get(opaqueId); + safeAwait(handler.channelAccepted); + return handler; + } + } + + static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer { + final SubscribableListener channelAccepted = new SubscribableListener<>(); + final String opaqueId; + final BlockingDeque recvChunks = new LinkedBlockingDeque<>(); + final Netty4HttpRequestBodyStream stream; + RestChannel channel; + + boolean recvLast = false; + + ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) { + this.opaqueId = opaqueId; + this.stream = stream; + } + + @Override + public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boolean isLast) { + recvChunks.add(new Chunk(chunk, isLast)); + } + + @Override + public void accept(RestChannel channel) throws Exception { + this.channel = channel; + channelAccepted.onResponse(null); + } + + void sendResponse(RestResponse response) { + channel.sendResponse(response); + } + + void consumeBytes(int bytes) { + if (recvLast) { + return; + } + while (bytes > 0) { + stream.next(); + var recvChunk = safePoll(recvChunks); + bytes -= recvChunk.chunk.length(); + recvChunk.chunk.close(); + if (recvChunk.isLast) { + recvLast = true; + break; + } + } + } + + Future onChannelThread(Callable task) { + return this.stream.channel().eventLoop().submit(task); + } + + record Chunk(ReleasableBytesReference chunk, boolean isLast) {} + } + + // takes full control of rest handler from the outside + public static class ControlServerRequestPlugin extends Plugin implements ActionPlugin { + + static final String ROUTE = "/_test/request-stream"; + + static final ConcurrentHashMap handlers = new ConcurrentHashMap<>(); + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.POST, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + var stream = (Netty4HttpRequestBodyStream) request.contentStream(); + var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0); + var handler = new ServerRequestHandler(opaqueId, stream); + handlers.put(opaqueId, handler); + return handler; + } + }); + } + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java new file mode 100644 index 0000000000000..16f1c2bbd2e37 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequest; + +import org.elasticsearch.http.HttpPreRequest; +import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils; + +import java.util.function.Predicate; + +public class Netty4HttpAggregator extends HttpObjectAggregator { + private static final Predicate IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false; + + private final Predicate decider; + private boolean shouldAggregate; + + public Netty4HttpAggregator(int maxContentLength) { + this(maxContentLength, IGNORE_TEST); + } + + public Netty4HttpAggregator(int maxContentLength, Predicate decider) { + super(maxContentLength); + this.decider = decider; + } + + @Override + public boolean acceptInboundMessage(Object msg) throws Exception { + if (msg instanceof HttpRequest request) { + var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request); + shouldAggregate = decider.test(preReq); + } + if (shouldAggregate) { + return super.acceptInboundMessage(msg); + } else { + return false; + } + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index b915011514d9a..18e76a51efa17 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -19,8 +19,11 @@ import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.ssl.SslCloseCompletionEvent; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; @@ -70,6 +73,12 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu @Nullable private ChunkedWrite currentChunkedWrite; + /** + * HTTP request content stream for current request, it's null if there is no current request or request is fully-aggregated + */ + @Nullable + private Netty4HttpRequestBodyStream currentRequestStream; + /* * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the * channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the @@ -109,23 +118,38 @@ public Netty4HttpPipeliningHandler( public void channelRead(final ChannelHandlerContext ctx, final Object msg) { activityTracker.startActivity(); try { - assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]"; - final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; - final Netty4HttpRequest netty4HttpRequest; - if (fullHttpRequest.decoderResult().isFailure()) { - final Throwable cause = fullHttpRequest.decoderResult().cause(); - final Exception nonError; - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - nonError = new Exception(cause); + if (msg instanceof HttpRequest request) { + final Netty4HttpRequest netty4HttpRequest; + if (request.decoderResult().isFailure()) { + final Throwable cause = request.decoderResult().cause(); + final Exception nonError; + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + nonError = new Exception(cause); + } else { + nonError = (Exception) cause; + } + netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError); } else { - nonError = (Exception) cause; + assert currentRequestStream == null : "current stream must be null for new request"; + if (request instanceof FullHttpRequest fullHttpRequest) { + netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); + currentRequestStream = null; + } else { + var contentStream = new Netty4HttpRequestBodyStream(ctx.channel()); + currentRequestStream = contentStream; + netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); + } } - netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError); + handlePipelinedRequest(ctx, netty4HttpRequest); } else { - netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); + assert msg instanceof HttpContent : "expect HttpContent got " + msg; + assert currentRequestStream != null : "current stream must exists before handling http content"; + currentRequestStream.handleNettyContent((HttpContent) msg); + if (msg instanceof LastHttpContent) { + currentRequestStream = null; + } } - handlePipelinedRequest(ctx, netty4HttpRequest); } finally { activityTracker.stopActivity(); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 1e35f084c87ec..2d1caba3c477e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -11,6 +11,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.EmptyHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; @@ -20,6 +21,7 @@ import io.netty.handler.codec.http.cookie.ServerCookieEncoder; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.ChunkedRestResponseBodyPart; @@ -39,22 +41,40 @@ public class Netty4HttpRequest implements HttpRequest { private final FullHttpRequest request; - private final BytesReference content; + private final HttpBody content; private final Map> headers; private final AtomicBoolean released; private final Exception inboundException; private final boolean pooled; private final int sequence; + Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) { + this( + sequence, + new DefaultFullHttpRequest( + request.protocolVersion(), + request.method(), + request.uri(), + Unpooled.EMPTY_BUFFER, + request.headers(), + EmptyHttpHeaders.INSTANCE + ), + new AtomicBoolean(false), + false, + contentStream, + null + ); + } + Netty4HttpRequest(int sequence, FullHttpRequest request) { - this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content())); + this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content())); } Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) { - this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()), inboundException); + this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException); } - private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, BytesReference content) { + private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) { this(sequence, request, released, pooled, content, null); } @@ -63,7 +83,7 @@ private Netty4HttpRequest( FullHttpRequest request, AtomicBoolean released, boolean pooled, - BytesReference content, + HttpBody content, Exception inboundException ) { this.sequence = sequence; @@ -86,7 +106,7 @@ public String uri() { } @Override - public BytesReference content() { + public HttpBody body() { assert released.get() == false; return content; } @@ -118,7 +138,7 @@ public HttpRequest releaseAndCopy() { ), new AtomicBoolean(false), false, - Netty4Utils.toBytesReference(copiedContent) + content ); } finally { release(); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java new file mode 100644 index 0000000000000..8497e3ee8a40d --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.LastHttpContent; + +import org.elasticsearch.http.HttpBody; +import org.elasticsearch.transport.netty4.Netty4Utils; + +import java.util.ArrayDeque; +import java.util.Queue; + +/** + * Netty based implementation of {@link HttpBody.Stream}. + * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} + * to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite + * autoRead=off. In this case chunks will be queued until downstream calls {@link Stream#next()} + */ +public class Netty4HttpRequestBodyStream implements HttpBody.Stream { + + private final Channel channel; + private final Queue chunkQueue = new ArrayDeque<>(); + private boolean requested = false; + private boolean hasLast = false; + private HttpBody.ChunkHandler handler; + + public Netty4HttpRequestBodyStream(Channel channel) { + this.channel = channel; + channel.closeFuture().addListener((f) -> releaseQueuedChunks()); + channel.config().setAutoRead(false); + } + + @Override + public ChunkHandler handler() { + return handler; + } + + @Override + public void setHandler(ChunkHandler chunkHandler) { + this.handler = chunkHandler; + } + + private void sendQueuedOrRead() { + assert channel.eventLoop().inEventLoop(); + requested = true; + var chunk = chunkQueue.poll(); + if (chunk == null) { + channel.read(); + } else { + sendChunk(chunk); + } + } + + @Override + public void next() { + assert handler != null : "handler must be set before requesting next chunk"; + if (channel.eventLoop().inEventLoop()) { + sendQueuedOrRead(); + } else { + channel.eventLoop().submit(this::sendQueuedOrRead); + } + } + + public void handleNettyContent(HttpContent httpContent) { + assert handler != null : "handler must be set before processing http content"; + if (requested && chunkQueue.isEmpty()) { + sendChunk(httpContent); + } else { + chunkQueue.add(httpContent); + } + if (httpContent instanceof LastHttpContent) { + hasLast = true; + channel.config().setAutoRead(true); + } + } + + // visible for test + Channel channel() { + return channel; + } + + // visible for test + Queue chunkQueue() { + return chunkQueue; + } + + // visible for test + boolean hasLast() { + return hasLast; + } + + private void sendChunk(HttpContent httpContent) { + assert requested; + requested = false; + var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content()); + var isLast = httpContent instanceof LastHttpContent; + handler.onNext(bytesRef, isLast); + } + + private void releaseQueuedChunks() { + while (chunkQueue.isEmpty() == false) { + chunkQueue.poll().release(); + } + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index a309877e9aa83..024391af46b62 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -365,7 +365,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ); } // combines the HTTP message pieces into a single full HTTP request (with headers and body) - final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.maxContentLength()); + final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength()); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() .addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index 1025ad11ba05e..7404226f069c5 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -23,10 +23,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Booleans; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.transport.TransportException; import java.io.IOException; @@ -123,6 +125,14 @@ public static BytesReference toBytesReference(final ByteBuf buffer) { } } + public static ReleasableBytesReference toReleasableBytesReference(final ByteBuf buffer) { + return new ReleasableBytesReference(toBytesReference(buffer), buffer::release); + } + + public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) { + return new HttpBody.ByteRefHttpBody(toBytesReference(buf)); + } + public static Recycler createRecycler(Settings settings) { // If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals // setting the processors. We must do it ourselves first just in case. diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java new file mode 100644 index 0000000000000..00066ffaf0201 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.flow.FlowControlHandler; + +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.http.HttpBody; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class Netty4HttpRequestBodyStreamTests extends ESTestCase { + + EmbeddedChannel channel; + Netty4HttpRequestBodyStream stream; + static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); + + @Before + public void createStream() { + channel = new EmbeddedChannel(); + stream = new Netty4HttpRequestBodyStream(channel); + stream.setHandler(discardHandler); // set default handler, each test might override one + channel.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { + msg.retain(); + stream.handleNettyContent(msg); + } + }); + } + + // ensures that no chunks are sent downstream without request + public void testEnqueueChunksBeforeRequest() { + var totalChunks = randomIntBetween(1, 100); + for (int i = 0; i < totalChunks; i++) { + channel.writeInbound(randomContent(1024)); + } + assertEquals(totalChunks, stream.chunkQueue().size()); + } + + // ensures all queued chunks can be flushed downstream + public void testFlushQueued() { + var chunks = new ArrayList(); + var totalBytes = new AtomicInteger(); + stream.setHandler((chunk, isLast) -> { + chunks.add(chunk); + totalBytes.addAndGet(chunk.length()); + }); + // enqueue chunks + var chunkSize = 1024; + var totalChunks = randomIntBetween(1, 100); + for (int i = 0; i < totalChunks; i++) { + channel.writeInbound(randomContent(chunkSize)); + } + // consume all chunks + for (var i = 0; i < totalChunks; i++) { + stream.next(); + } + assertEquals(totalChunks, chunks.size()); + assertEquals(chunkSize * totalChunks, totalBytes.get()); + } + + // ensures that we read from channel when chunks queue is empty + // and pass next chunk downstream without queuing + public void testReadFromChannel() { + var gotChunks = new ArrayList(); + var gotLast = new AtomicBoolean(false); + stream.setHandler((chunk, isLast) -> { + gotChunks.add(chunk); + gotLast.set(isLast); + }); + channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() + var chunkSize = 1024; + var totalChunks = randomIntBetween(1, 32); + for (int i = 0; i < totalChunks - 1; i++) { + channel.writeInbound(randomContent(chunkSize)); + } + channel.writeInbound(randomLastContent(chunkSize)); + + for (int i = 0; i < totalChunks; i++) { + assertEquals("should not enqueue chunks", 0, stream.chunkQueue().size()); + stream.next(); + assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); + } + assertTrue("should receive last content", gotLast.get()); + } + + HttpContent randomContent(int size, boolean isLast) { + var buf = Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + if (isLast) { + return new DefaultLastHttpContent(buf); + } else { + return new DefaultHttpContent(buf); + } + } + + HttpContent randomContent(int size) { + return randomContent(size, false); + } + + HttpContent randomLastContent(int size) { + return randomContent(size, true); + } + +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index bc6e5fef834e8..9e213e6468356 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -904,7 +904,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th assertThat(channel.request().getHttpRequest().header(headerReference.get()), is(headerValueReference.get())); assertThat(channel.request().getHttpRequest().method(), is(translateRequestMethod(httpMethodReference.get()))); // assert content is dropped - assertThat(channel.request().getHttpRequest().content().utf8ToString(), is("")); + assertThat(channel.request().getHttpRequest().body().asFull().bytes().utf8ToString(), is("")); try { channel.sendResponse(new RestResponse(channel, (Exception) ((ElasticsearchWrapperException) cause).getCause())); } catch (IOException e) { diff --git a/server/src/main/java/org/elasticsearch/http/HttpBody.java b/server/src/main/java/org/elasticsearch/http/HttpBody.java new file mode 100644 index 0000000000000..b4d88b837b117 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpBody.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.core.Nullable; + +/** + * A super-interface for different HTTP content implementations + */ +public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream { + + static Full fromBytesReference(BytesReference bytesRef) { + return new ByteRefHttpBody(bytesRef); + } + + static Full empty() { + return new ByteRefHttpBody(BytesArray.EMPTY); + } + + default boolean isFull() { + return this instanceof Full; + } + + default boolean isStream() { + return this instanceof Stream; + } + + /** + * Assumes that HTTP body is a full content. If not sure, use {@link HttpBody#isFull()}. + */ + default Full asFull() { + assert this instanceof Full; + return (Full) this; + } + + /** + * Assumes that HTTP body is a lazy-stream. If not sure, use {@link HttpBody#isStream()}. + */ + default Stream asStream() { + assert this instanceof Stream; + return (Stream) this; + } + + /** + * Full content represents a complete http body content that can be accessed immediately. + */ + non-sealed interface Full extends HttpBody { + BytesReference bytes(); + } + + /** + * Stream is a lazy-loaded content. Stream supports only single handler, this handler must be + * set before requesting next chunk. + */ + non-sealed interface Stream extends HttpBody { + /** + * Returns current handler + */ + @Nullable + ChunkHandler handler(); + + /** + * Sets handler that can handle next chunk + */ + void setHandler(ChunkHandler chunkHandler); + + /** + * Request next chunk of data from the network. The size of the chunk depends on following + * factors. If request is not compressed then chunk size will be up to + * {@link HttpTransportSettings#SETTING_HTTP_MAX_CHUNK_SIZE}. If request is compressed then + * chunk size will be up to max_chunk_size * compression_ratio. Multiple calls can be + * deduplicated when next chunk is not yet available. It's recommended to call "next" once + * for every chunk. + *
+         * {@code
+         *     stream.setHandler((chunk, isLast) -> {
+         *         processChunk(chunk);
+         *         if (isLast == false) {
+         *             stream.next();
+         *         }
+         *     });
+         * }
+         * 
+ */ + void next(); + } + + @FunctionalInterface + interface ChunkHandler { + void onNext(ReleasableBytesReference chunk, boolean isLast); + } + + record ByteRefHttpBody(BytesReference bytes) implements Full {} +} diff --git a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java index e8c92c8540df2..ef05af8bb9ade 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java +++ b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java @@ -226,7 +226,9 @@ synchronized void update(HttpRequest httpRequest, HttpChannel httpChannel, long lastRequestTimeMillis = currentTimeMillis; lastUri = httpRequest.uri(); requestCount += 1; - requestSizeBytes += httpRequest.content().length(); + if (httpRequest.body().isFull()) { + requestSizeBytes += httpRequest.body().asFull().bytes().length(); + } } private static String getFirstValueForHeader(final HttpRequest request, final String header) { diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index 2757fa15ce477..cce57d20be23f 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -27,7 +27,7 @@ enum HttpVersion { HTTP_1_1 } - BytesReference content(); + HttpBody body(); List strictCookies(); @@ -46,7 +46,7 @@ enum HttpVersion { Exception getInboundException(); /** - * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()} + * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #body()} * after this method has been invoked is undefined and implementation specific. */ void release(); diff --git a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java index a17bc885f6b65..d73b679917978 100644 --- a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java @@ -10,6 +10,7 @@ import org.apache.lucene.search.spell.LevenshteinDistance; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.util.set.Sets; @@ -109,12 +110,18 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); } - if (request.hasContent() && request.isContentConsumed() == false) { + if (request.hasContent() && (request.isContentConsumed() == false && request.isFullContent())) { throw new IllegalArgumentException( "request [" + request.method() + " " + request.path() + "] does not support having a body" ); } + if (request.isStreamedContent()) { + assert action instanceof RequestBodyChunkConsumer; + var chunkConsumer = (RequestBodyChunkConsumer) action; + request.contentStream().setHandler((chunk, isLast) -> chunkConsumer.handleChunk(channel, chunk, isLast)); + } + usageCount.increment(); // execute the action action.accept(channel); @@ -172,6 +179,10 @@ protected interface RestChannelConsumer extends CheckedConsumer 0) { + if (request.hasContent()) { if (isContentTypeDisallowed(request) || handler.mediaTypesValid(request) == false) { sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel); return; @@ -453,6 +452,9 @@ private void dispatchRequest( return; } } + // TODO: estimate streamed content size for circuit breaker, + // something like http_max_chunk_size * avg_compression_ratio(for compressed content) + final int contentLength = request.isFullContent() ? request.contentLength() : 0; try { if (handler.canTripCircuitBreaker()) { inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 66ba0c743813e..7d3544e156bdc 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -22,6 +22,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.telemetry.tracing.Traceable; @@ -278,16 +279,28 @@ public final String path() { } public boolean hasContent() { - return contentLength() > 0; + return isStreamedContent() || contentLength() > 0; } public int contentLength() { - return httpRequest.content().length(); + return httpRequest.body().asFull().bytes().length(); + } + + public boolean isFullContent() { + return httpRequest.body().isFull(); } public BytesReference content() { this.contentConsumed = true; - return httpRequest.content(); + return httpRequest.body().asFull().bytes(); + } + + public boolean isStreamedContent() { + return httpRequest.body().isStream(); + } + + public HttpBody.Stream contentStream() { + return httpRequest.body().asStream(); } /** diff --git a/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java b/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java index 2dfaaf34bb1f1..b0843985bb0b7 100644 --- a/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/http/HttpClientStatsTrackerTests.java @@ -118,7 +118,7 @@ public void testStatsCollection() { assertThat(clientStats.remoteAddress(), equalTo(NetworkAddress.format(httpChannel.getRemoteAddress()))); assertThat(clientStats.lastUri(), equalTo(httpRequest1.uri())); assertThat(clientStats.requestCount(), equalTo(1L)); - requestLength += httpRequest1.content().length(); + requestLength += httpRequest1.body().asFull().bytes().length(); assertThat(clientStats.requestSizeBytes(), equalTo(requestLength)); assertThat(clientStats.closedTimeMillis(), equalTo(-1L)); assertThat(clientStats.openedTimeMillis(), equalTo(openTimeMillis)); @@ -148,7 +148,7 @@ public void testStatsCollection() { assertThat(clientStats.remoteAddress(), equalTo(NetworkAddress.format(httpChannel.getRemoteAddress()))); assertThat(clientStats.lastUri(), equalTo(httpRequest2.uri())); assertThat(clientStats.requestCount(), equalTo(2L)); - requestLength += httpRequest2.content().length(); + requestLength += httpRequest2.body().asFull().bytes().length(); assertThat(clientStats.requestSizeBytes(), equalTo(requestLength)); assertThat(clientStats.closedTimeMillis(), equalTo(-1L)); assertThat(clientStats.openedTimeMillis(), equalTo(openTimeMillis)); diff --git a/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java b/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java index e7b0232afa245..808f4313e7237 100644 --- a/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java +++ b/server/src/test/java/org/elasticsearch/http/TestHttpRequest.java @@ -8,7 +8,6 @@ package org.elasticsearch.http; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; @@ -48,8 +47,8 @@ public String uri() { } @Override - public BytesReference content() { - return BytesArray.EMPTY; + public HttpBody body() { + return HttpBody.empty(); } @Override diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index 391b2a08021fd..4e90bbd85d4f8 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.http.HttpHeadersValidationException; import org.elasticsearch.http.HttpInfo; import org.elasticsearch.http.HttpRequest; @@ -830,11 +831,11 @@ public String uri() { } @Override - public BytesReference content() { + public HttpBody body() { if (hasContent) { - return new BytesArray("test"); + return HttpBody.fromBytesReference(new BytesArray("test")); } - return BytesArray.EMPTY; + return HttpBody.empty(); } @Override diff --git a/server/src/test/java/org/elasticsearch/rest/RestRequestTests.java b/server/src/test/java/org/elasticsearch/rest/RestRequestTests.java index bb06dbe5d09aa..52ab209b00955 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestRequestTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestRequestTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.test.ESTestCase; @@ -83,7 +84,7 @@ public void testContentLengthDoesNotConsumesContent() { private void runConsumesContentTest(final CheckedConsumer consumer, final boolean expected) { final HttpRequest httpRequest = mock(HttpRequest.class); when(httpRequest.uri()).thenReturn(""); - when(httpRequest.content()).thenReturn(new BytesArray(new byte[1])); + when(httpRequest.body()).thenReturn(HttpBody.fromBytesReference(new BytesArray(new byte[1]))); when(httpRequest.getHeaders()).thenReturn( Collections.singletonMap("Content-Type", Collections.singletonList(randomFrom("application/json", "application/x-ndjson"))) ); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 3a9c4b371c9da..bcea7f61fe158 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.http.HttpBody; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; @@ -53,24 +54,24 @@ public static class FakeHttpRequest implements HttpRequest { private final Method method; private final String uri; - private final BytesReference content; + private final HttpBody content; private final Map> headers; private final Exception inboundException; public FakeHttpRequest(Method method, String uri, BytesReference content, Map> headers) { - this(method, uri, content, headers, null); + this(method, uri, content == null ? HttpBody.empty() : HttpBody.fromBytesReference(content), headers, null); } private FakeHttpRequest( Method method, String uri, - BytesReference content, + HttpBody content, Map> headers, Exception inboundException ) { this.method = method; this.uri = uri; - this.content = content == null ? BytesArray.EMPTY : content; + this.content = content; this.headers = headers; this.inboundException = inboundException; } @@ -86,7 +87,7 @@ public String uri() { } @Override - public BytesReference content() { + public HttpBody body() { return content; } @@ -194,7 +195,7 @@ public static class Builder { private Map params = new HashMap<>(); - private BytesReference content = BytesArray.EMPTY; + private HttpBody content = HttpBody.empty(); private String path = "/"; @@ -220,7 +221,7 @@ public Builder withParams(Map params) { } public Builder withContent(BytesReference contentBytes, XContentType xContentType) { - this.content = contentBytes; + this.content = HttpBody.fromBytesReference(contentBytes); if (xContentType != null) { headers.put("Content-Type", Collections.singletonList(xContentType.mediaType())); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java index 17bad90415e7c..f56749330579f 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java @@ -2636,7 +2636,7 @@ public void testAuthenticationSuccessRest() throws Exception { checkedFields.put(LoggingAuditTrail.REQUEST_ID_FIELD_NAME, requestId); checkedFields.put(LoggingAuditTrail.URL_PATH_FIELD_NAME, "_uri"); if (includeRequestBody && Strings.hasLength(request.content())) { - checkedFields.put(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, request.getHttpRequest().content().utf8ToString()); + checkedFields.put(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, request.getHttpRequest().body().asFull().bytes().utf8ToString()); } if (params.isEmpty() == false) { checkedFields.put(LoggingAuditTrail.URL_QUERY_FIELD_NAME, "foo=bar&evac=true");