diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4RequestContentPublisherIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4RequestContentPublisherIT.java new file mode 100644 index 0000000000000..23f57f47f3aab --- /dev/null +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4RequestContentPublisherIT.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.ContentType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.IndexScopedSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.http.HttpContent; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.function.Predicate; +import java.util.function.Supplier; + +public class Netty4RequestContentPublisherIT extends ESNetty4IntegTestCase { + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.concatLists(List.of(RequestContentStreamPlugin.class), super.nodePlugins()); + } + + @Override + protected boolean addMockHttpTransport() { + return false; // enable http + } + + public void testBasicStream() throws IOException { + var totalBytes = 1024 * 1024; + var request = new Request("POST", RequestContentStreamPlugin.ROUTE); + request.setEntity(new ByteArrayEntity(randomByteArrayOfLength(totalBytes), ContentType.APPLICATION_JSON)); + + var respose = getRestClient().performRequest(request); + assertEquals(200, respose.getStatusLine().getStatusCode()); + var gotTotalBytes = new BytesArray(respose.getEntity().getContent().readAllBytes()).utf8ToString(); + assertEquals("" + totalBytes, gotTotalBytes); + } + + public static class RequestContentStreamPlugin extends Plugin implements ActionPlugin { + + static final String ROUTE = "/_test/request-stream/basic"; + private static final Logger LOGGER = LogManager.getLogger("StreamRestHandler"); + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(RestRequest.Method.POST, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + return new RestChannelConsumer() { + @Override + public void accept(RestChannel channel) { + + // netty channel will hold all chunks until we subscribe + request.contentPublisher().subscribe(new Flow.Subscriber<>() { + Flow.Subscription subscription; + int totalReceivedBytes; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + // after subscription, we can request N next messages, for example 5 chunks + subscription.request(5); + } + + @Override + public void onNext(HttpContent item) { + // chunk handler + var contentSize = item.content().length(); + LOGGER.info("got next item of a size {}", contentSize); + totalReceivedBytes += contentSize; + item.release(); + // we need explicitly ask for next N chunks + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + // not implemented yet + assert false : throwable.getMessage(); + } + + @Override + public void onComplete() { + // completion event after LastHttpContent + LOGGER.info("complete"); + channel.sendResponse(new RestResponse(RestStatus.OK, Integer.toString(totalReceivedBytes))); + } + }); + + } + }; + } + }); + } + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ContentDecompressor.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ContentDecompressor.java new file mode 100644 index 0000000000000..17d45a1332878 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4ContentDecompressor.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.LastHttpContent; + +import java.util.List; + +/** + * A wrapper around {@link HttpContentDecompressor} that consumes and produces {@link PipelinedHttpRequest} and + * {@link PipelinedHttpContent}. + */ +public class Netty4ContentDecompressor extends HttpContentDecompressor { + + @Override + public boolean acceptInboundMessage(Object msg) throws Exception { + return msg instanceof PipelinedHttpObject; + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { + super.decode(ctx, msg, out); + final var sequence = ((PipelinedHttpObject) msg).sequence(); + out.replaceAll(obj -> { + if (obj instanceof PipelinedHttpObject) { + return obj; + } else if (obj instanceof FullHttpRequest request) { + return new PipelinedFullHttpRequest(request, sequence); + } else if (obj instanceof HttpRequest request) { + return new PipelinedHttpRequest(request, sequence); + } else if (obj instanceof LastHttpContent lastContent) { + return new PipelinedLastHttpContent(lastContent, sequence); + } else if (obj instanceof HttpContent content) { + return new PipelinedHttpContent(content, sequence); + } else { + throw new IllegalArgumentException(); + } + }); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java new file mode 100644 index 0000000000000..35896272fbc8d --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpObjectAggregator; + +import java.util.List; +import java.util.function.Predicate; + +/** + * A wrapper around {@link HttpObjectAggregator}, provides optional aggregation for {@link PipelinedHttpObject}'s. + * A {@code decider} predicate selects HTTP requests that will be aggregated into {@link PipelinedFullHttpRequest}. + */ +public class Netty4HttpAggregator extends HttpObjectAggregator { + + private static final Predicate IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false; + + private final Predicate decider; + private boolean shouldAggregate; + + public Netty4HttpAggregator(int maxContentLength) { + this(maxContentLength, IGNORE_TEST); + } + + public Netty4HttpAggregator(int maxContentLength, Predicate decider) { + super(maxContentLength); + this.decider = decider; + } + + @Override + public boolean acceptInboundMessage(Object msg) { + if (msg instanceof PipelinedHttpRequest request) { + shouldAggregate = decider.test(request); + return shouldAggregate; + } else if (msg instanceof PipelinedHttpContent || msg instanceof PipelinedLastHttpContent) { + return shouldAggregate; + } else { + return false; + } + } + + @Override + protected void decode(ChannelHandlerContext ctx, HttpObject msg, List out) throws Exception { + super.decode(ctx, msg, out); + out.replaceAll(o -> new PipelinedFullHttpRequest((FullHttpRequest) o, ((PipelinedHttpObject) msg).sequence())); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContent.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContent.java new file mode 100644 index 0000000000000..1e248132ed717 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpContent.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.http.HttpContent; +import org.elasticsearch.transport.netty4.Netty4Utils; + +public class Netty4HttpContent implements HttpContent { + + private final io.netty.handler.codec.http.HttpContent nettyContent; + private final BytesReference ref; + + Netty4HttpContent(io.netty.handler.codec.http.HttpContent httpContent) { + nettyContent = httpContent; + ref = Netty4Utils.toBytesReference(httpContent.content()); + } + + @Override + public BytesReference content() { + return ref; + } + + @Override + public void release() { + nettyContent.release(); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index b915011514d9a..e33819b6a6aa8 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -19,7 +19,9 @@ import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.ssl.SslCloseCompletionEvent; import io.netty.util.ReferenceCountUtil; @@ -70,12 +72,13 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu @Nullable private ChunkedWrite currentChunkedWrite; - /* - * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the - * channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the - * current write sequence, implying that all preceding messages have been written. + private Netty4HttpRequest currentRequest; + + /** + * Read sequence numbers are attached to requests by {@link Netty4InboundHttpPipeliningHandler} and then transferred to responses. + * A response is not written to the channel context until its sequence number matches the current write sequence, implying that all + * preceding messages have been written. */ - private int readSequence; private int writeSequence; /** @@ -109,23 +112,34 @@ public Netty4HttpPipeliningHandler( public void channelRead(final ChannelHandlerContext ctx, final Object msg) { activityTracker.startActivity(); try { - assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]"; - final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; - final Netty4HttpRequest netty4HttpRequest; - if (fullHttpRequest.decoderResult().isFailure()) { - final Throwable cause = fullHttpRequest.decoderResult().cause(); - final Exception nonError; - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - nonError = new Exception(cause); + assert msg instanceof PipelinedHttpObject : "Should have pipelined message already but saw [" + msg + "]"; + final var sequence = ((PipelinedHttpObject) msg).sequence(); + if (msg instanceof HttpRequest request) { + final Netty4HttpRequest netty4HttpRequest; + if (request.decoderResult().isFailure()) { + final Throwable cause = request.decoderResult().cause(); + final Exception nonError; + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + nonError = new Exception(cause); + } else { + nonError = (Exception) cause; + } + netty4HttpRequest = new Netty4HttpRequest(sequence, (FullHttpRequest) request, nonError); } else { - nonError = (Exception) cause; + if (request instanceof FullHttpRequest fullRequest) { + netty4HttpRequest = new Netty4HttpRequest(sequence, fullRequest); + } else { + var contentPublisher = new Netty4RequestContentPublisher(ctx.channel()); + netty4HttpRequest = new Netty4HttpRequest(sequence, request, contentPublisher); + currentRequest = netty4HttpRequest; + } } - netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError); + handlePipelinedRequest(ctx, netty4HttpRequest); } else { - netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); + assert currentRequest != null; + currentRequest.contentPublisher().sendChunk((HttpContent) msg); } - handlePipelinedRequest(ctx, netty4HttpRequest); } finally { activityTracker.stopActivity(); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 1e35f084c87ec..d9755f53e9a50 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -11,16 +11,22 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.http.HttpRequest; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestRequest; @@ -36,9 +42,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -public class Netty4HttpRequest implements HttpRequest { +public class Netty4HttpRequest implements org.elasticsearch.http.HttpRequest { - private final FullHttpRequest request; + private final HttpRequest nettyRequest; + private final HttpContent nettyContent; + private final Netty4RequestContentPublisher contentPublisher; private final BytesReference content; private final Map> headers; private final AtomicBoolean released; @@ -46,29 +54,52 @@ public class Netty4HttpRequest implements HttpRequest { private final boolean pooled; private final int sequence; + Netty4HttpRequest(int sequence, HttpRequest request, Netty4RequestContentPublisher contentPublisher) { + this.sequence = sequence; + this.nettyRequest = request; + this.nettyContent = new DefaultHttpContent(Unpooled.EMPTY_BUFFER); + this.contentPublisher = contentPublisher; + this.content = BytesArray.EMPTY; + this.released = new AtomicBoolean(false); + this.pooled = false; + this.headers = getHttpHeadersAsMap(request.headers()); + this.inboundException = null; + } + Netty4HttpRequest(int sequence, FullHttpRequest request) { this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content())); } - Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) { - this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()), inboundException); + Netty4HttpRequest(int sequence, FullHttpRequest fullRequest, Exception inboundException) { + this( + sequence, + fullRequest, + fullRequest, + new AtomicBoolean(false), + true, + Netty4Utils.toBytesReference(fullRequest.content()), + inboundException + ); } - private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, BytesReference content) { - this(sequence, request, released, pooled, content, null); + private Netty4HttpRequest(int sequence, FullHttpRequest fullRequest, AtomicBoolean released, boolean pooled, BytesReference content) { + this(sequence, fullRequest, fullRequest, released, pooled, content, null); } private Netty4HttpRequest( int sequence, - FullHttpRequest request, + HttpRequest httpRequest, + HttpContent httpContent, AtomicBoolean released, boolean pooled, BytesReference content, Exception inboundException ) { this.sequence = sequence; - this.request = request; - this.headers = getHttpHeadersAsMap(request.headers()); + this.nettyRequest = httpRequest; + this.nettyContent = httpContent; + this.contentPublisher = null; + this.headers = getHttpHeadersAsMap(httpRequest.headers()); this.content = content; this.pooled = pooled; this.released = released; @@ -77,12 +108,12 @@ private Netty4HttpRequest( @Override public RestRequest.Method method() { - return translateRequestMethod(request.method()); + return translateRequestMethod(nettyRequest.method()); } @Override public String uri() { - return request.uri(); + return nettyRequest.uri(); } @Override @@ -94,32 +125,45 @@ public BytesReference content() { @Override public void release() { if (pooled && released.compareAndSet(false, true)) { - request.release(); + nettyContent.release(); } } @Override - public HttpRequest releaseAndCopy() { + public org.elasticsearch.http.HttpRequest releaseAndCopy() { assert released.get() == false; if (pooled == false) { return this; } try { - final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); - return new Netty4HttpRequest( - sequence, - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - copiedContent, - request.headers(), - request.trailingHeaders() - ), - new AtomicBoolean(false), - false, - Netty4Utils.toBytesReference(copiedContent) - ); + final ByteBuf copiedContent = Unpooled.copiedBuffer(nettyContent.content()); + if (nettyRequest instanceof FullHttpRequest fullRequest) { + return new Netty4HttpRequest( + sequence, + new DefaultFullHttpRequest( + fullRequest.protocolVersion(), + fullRequest.method(), + fullRequest.uri(), + copiedContent, + fullRequest.headers().copy(), + fullRequest.trailingHeaders().copy() + ), + new AtomicBoolean(false), + false, + Netty4Utils.toBytesReference(copiedContent) + ); + } else { + return new Netty4HttpRequest( + sequence, + new DefaultHttpRequest( + nettyRequest.protocolVersion(), + nettyRequest.method(), + nettyRequest.uri(), + nettyRequest.headers().copy() + ), + contentPublisher + ); + } } finally { release(); } @@ -132,7 +176,7 @@ public final Map> getHeaders() { @Override public List strictCookies() { - String cookieString = request.headers().get(HttpHeaderNames.COOKIE); + String cookieString = nettyRequest.headers().get(HttpHeaderNames.COOKIE); if (cookieString != null) { Set cookies = ServerCookieDecoder.STRICT.decode(cookieString); if (cookies.isEmpty() == false) { @@ -144,40 +188,45 @@ public List strictCookies() { @Override public HttpVersion protocolVersion() { - if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) { - return HttpRequest.HttpVersion.HTTP_1_0; - } else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) { - return HttpRequest.HttpVersion.HTTP_1_1; + if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) { + return HttpVersion.HTTP_1_0; + } else if (nettyRequest.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) { + return HttpVersion.HTTP_1_1; } else { - throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion()); + throw new IllegalArgumentException("Unexpected http protocol version: " + nettyRequest.protocolVersion()); } } @Override - public HttpRequest removeHeader(String header) { - HttpHeaders copiedHeadersWithout = request.headers().copy(); + public org.elasticsearch.http.HttpRequest removeHeader(String header) { + HttpHeaders copiedHeadersWithout = nettyRequest.headers().copy(); copiedHeadersWithout.remove(header); - HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy(); - copiedTrailingHeadersWithout.remove(header); - FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - request.content(), - copiedHeadersWithout, - copiedTrailingHeadersWithout + var requestWithoutHeader = new DefaultHttpRequest( + nettyRequest.protocolVersion(), + nettyRequest.method(), + nettyRequest.uri(), + copiedHeadersWithout ); - return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content); + + HttpContent contentWithoutHeader; + if (nettyContent instanceof LastHttpContent lastHttpContent) { + var copiedTrailingHeadersWithout = lastHttpContent.trailingHeaders().copy(); + copiedTrailingHeadersWithout.remove(header); + contentWithoutHeader = new DefaultLastHttpContent(nettyContent.content(), copiedTrailingHeadersWithout); + } else { + contentWithoutHeader = nettyContent; + } + return new Netty4HttpRequest(sequence, requestWithoutHeader, contentWithoutHeader, released, pooled, content, null); } @Override public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) { - return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef); + return new Netty4FullHttpResponse(sequence, nettyRequest.protocolVersion(), status, contentRef); } @Override public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { - return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart); + return new Netty4ChunkedHttpResponse(sequence, nettyRequest.protocolVersion(), status, firstBodyPart); } @Override @@ -185,8 +234,13 @@ public Exception getInboundException() { return inboundException; } + @Override + public Netty4RequestContentPublisher contentPublisher() { + return contentPublisher; + } + public io.netty.handler.codec.http.HttpRequest getNettyRequest() { - return request; + return nettyRequest; } public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index f48a3143fd016..d4938931b21c3 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -21,13 +21,13 @@ import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.HttpContentCompressor; -import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.flow.FlowControlHandler; import io.netty.handler.ssl.SslHandler; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutHandler; @@ -350,7 +350,9 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ); } decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); + ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces + ch.pipeline().addLast(new FlowControlHandler()); if (httpValidator != null) { // runs a validation function on the first HTTP message piece which contains all the headers // if validation passes, the pieces of that particular request are forwarded, otherwise they are discarded @@ -363,11 +365,13 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ) ); } + + ch.pipeline().addLast("inbound_pipelining", new Netty4InboundHttpPipeliningHandler()); // combines the HTTP message pieces into a single full HTTP request (with headers and body) - final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.maxContentLength()); + final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength()); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() - .addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression + .addLast("decoder_compress", new Netty4ContentDecompressor()) // this handles request body decompression .addLast("encoder", new HttpResponseEncoder() { @Override protected boolean isContentAlwaysEmpty(HttpResponse msg) { @@ -384,6 +388,7 @@ protected boolean isContentAlwaysEmpty(HttpResponse msg) { if (handlingSettings.compression()) { ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.compressionLevel())); } + ch.pipeline() .addLast( "pipelining", diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4InboundHttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4InboundHttpPipeliningHandler.java new file mode 100644 index 0000000000000..1ef30e1b20bd8 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4InboundHttpPipeliningHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.LastHttpContent; + +/** + * Inbound HTTP pipelining handler that marks all incoming HTTP messages with a sequence number. + */ +public class Netty4InboundHttpPipeliningHandler extends ChannelInboundHandlerAdapter { + + private int sequence = -1; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + assert msg instanceof HttpRequest || msg instanceof HttpContent; + if (msg instanceof FullHttpRequest request) { + sequence++; + ctx.fireChannelRead(new PipelinedFullHttpRequest(request, sequence)); + } else if (msg instanceof LastHttpContent content) { + ctx.fireChannelRead(new PipelinedLastHttpContent(content, sequence)); + } else if (msg instanceof HttpContent content) { + ctx.fireChannelRead(new PipelinedHttpContent(content, sequence)); + } else { + var request = (HttpRequest) msg; + sequence++; + ctx.fireChannelRead(new PipelinedHttpRequest(request, sequence)); + } + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RequestContentPublisher.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RequestContentPublisher.java new file mode 100644 index 0000000000000..e5d5d44ffd15c --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4RequestContentPublisher.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.channel.Channel; +import io.netty.handler.codec.http.LastHttpContent; + +import org.elasticsearch.http.HttpContent; + +import java.util.concurrent.Flow; + +public class Netty4RequestContentPublisher implements Flow.Publisher { + + private final Channel channel; + private long requested = 0; + private Flow.Subscriber subscriber; + + public Netty4RequestContentPublisher(Channel channel) { + this.channel = channel; + channel.config().setAutoRead(false); + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + subscriber.onSubscribe(new Flow.Subscription() { + @Override + public void request(long n) { + requested += n; + if (requested > 0) { + channel.read(); + } + } + + @Override + public void cancel() {} + }); + } + + public void sendChunk(io.netty.handler.codec.http.HttpContent chunk) { + assert subscriber != null; + if (chunk != LastHttpContent.EMPTY_LAST_CONTENT) { + var content = new Netty4HttpContent(chunk); + subscriber.onNext(content); + requested -= 1; + if (requested > 0) channel.read(); + } + if (chunk instanceof LastHttpContent) { + subscriber.onComplete(); + channel.config().setAutoRead(true); + } + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedFullHttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedFullHttpRequest.java new file mode 100644 index 0000000000000..2374d63d9b12e --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedFullHttpRequest.java @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpVersion; + +/** + * A {@link FullHttpRequest} with pipeline sequence number + */ +public record PipelinedFullHttpRequest(FullHttpRequest request, int sequence) implements PipelinedHttpObject, FullHttpRequest { + + public PipelinedFullHttpRequest withRequest(FullHttpRequest request) { + return new PipelinedFullHttpRequest(request, sequence); + } + + @Override + public ByteBuf content() { + return request.content(); + } + + @Override + public HttpHeaders trailingHeaders() { + return request.headers(); + } + + @Override + public FullHttpRequest copy() { + return withRequest(request.copy()); + } + + @Override + public FullHttpRequest duplicate() { + return withRequest(request.duplicate()); + } + + @Override + public FullHttpRequest retainedDuplicate() { + return withRequest(request.retainedDuplicate()); + } + + @Override + public FullHttpRequest replace(ByteBuf content) { + return withRequest(request.replace(content)); + } + + @Override + public FullHttpRequest retain(int increment) { + request.retain(); + return this; + } + + @Override + public int refCnt() { + return request.refCnt(); + } + + @Override + public FullHttpRequest retain() { + request.retain(); + return this; + } + + @Override + public FullHttpRequest touch() { + request.touch(); + return this; + } + + @Override + public FullHttpRequest touch(Object hint) { + request.touch(hint); + return this; + } + + @Override + public boolean release() { + return request.release(); + } + + @Override + public boolean release(int decrement) { + return request.release(decrement); + } + + @Override + public HttpVersion getProtocolVersion() { + return request.protocolVersion(); + } + + @Override + public HttpVersion protocolVersion() { + return request.protocolVersion(); + } + + @Override + public FullHttpRequest setProtocolVersion(HttpVersion version) { + request.setProtocolVersion(version); + return this; + } + + @Override + public HttpHeaders headers() { + return request.headers(); + } + + @Override + public HttpMethod getMethod() { + return request.method(); + } + + @Override + public HttpMethod method() { + return request.method(); + } + + @Override + public FullHttpRequest setMethod(HttpMethod method) { + request.setMethod(method); + return this; + } + + @Override + public String getUri() { + return request.uri(); + } + + @Override + public String uri() { + return request.uri(); + } + + @Override + public FullHttpRequest setUri(String uri) { + request.setUri(uri); + return this; + } + + @Override + public DecoderResult getDecoderResult() { + return request.decoderResult(); + } + + @Override + public void setDecoderResult(DecoderResult result) { + request.setDecoderResult(result); + } + + @Override + public DecoderResult decoderResult() { + return request.decoderResult(); + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpContent.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpContent.java new file mode 100644 index 0000000000000..4ec88e526ef41 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpContent.java @@ -0,0 +1,107 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.DefaultHttpContent; +import io.netty.handler.codec.http.HttpContent; + +/** + * A {@link HttpContent} with pipeline sequence number + */ +public record PipelinedHttpContent(HttpContent httpContent, int sequence) implements PipelinedHttpObject, HttpContent { + + public PipelinedHttpContent(ByteBuf buf, int sequence) { + this(new DefaultHttpContent(buf), sequence); + } + + public PipelinedHttpContent withContent(HttpContent httpContent) { + return new PipelinedHttpContent(httpContent, sequence); + } + + @Override + public ByteBuf content() { + return httpContent.content(); + } + + @Override + public HttpContent copy() { + return withContent(httpContent.copy()); + } + + @Override + public HttpContent duplicate() { + return withContent(httpContent.duplicate()); + } + + @Override + public HttpContent retainedDuplicate() { + return withContent(httpContent.retainedDuplicate()); + } + + @Override + public HttpContent replace(ByteBuf content) { + return withContent(httpContent.replace(content)); + } + + @Override + public int refCnt() { + return httpContent.refCnt(); + } + + @Override + public HttpContent retain() { + httpContent.retain(); + return this; + } + + @Override + public HttpContent retain(int increment) { + httpContent.retain(increment); + return this; + } + + @Override + public HttpContent touch() { + httpContent.touch(); + return this; + } + + @Override + public HttpContent touch(Object hint) { + httpContent.touch(hint); + return this; + } + + @Override + public boolean release() { + return httpContent.release(); + } + + @Override + public boolean release(int decrement) { + return httpContent.release(decrement); + } + + @Override + public DecoderResult getDecoderResult() { + return httpContent.decoderResult(); + } + + @Override + public void setDecoderResult(DecoderResult result) { + httpContent.setDecoderResult(result); + } + + @Override + public DecoderResult decoderResult() { + return httpContent.decoderResult(); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpObject.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpObject.java new file mode 100644 index 0000000000000..32a64cace8cd3 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpObject.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +public sealed interface PipelinedHttpObject permits PipelinedFullHttpRequest, PipelinedHttpContent, PipelinedHttpRequest, + PipelinedLastHttpContent { + + /** + * HTTP request sequence number, indicates order of arrival within same channel (connection). + * All parts of a single HTTP request - {@link PipelinedHttpRequest} and {@link PipelinedHttpContent} - has same sequence number. + */ + int sequence(); +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpRequest.java new file mode 100644 index 0000000000000..fdae9ce8bd2e2 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedHttpRequest.java @@ -0,0 +1,94 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpVersion; + +/** + * A {@link HttpRequest} with pipeline sequence number. + */ +public record PipelinedHttpRequest(HttpRequest request, int sequence) implements PipelinedHttpObject, HttpRequest { + + public PipelinedHttpRequest(HttpMethod method, String uri, int sequence) { + this(new DefaultHttpRequest(HttpVersion.HTTP_1_1, method, uri), sequence); + } + + @Override + public HttpMethod getMethod() { + return method(); + } + + @Override + public HttpMethod method() { + return request.method(); + } + + @Override + public HttpRequest setMethod(HttpMethod method) { + request.setMethod(method); + return this; + } + + @Override + public String getUri() { + return uri(); + } + + @Override + public String uri() { + return request.uri(); + } + + @Override + public HttpRequest setUri(String uri) { + request.setUri(uri); + return this; + } + + @Override + public HttpVersion getProtocolVersion() { + return protocolVersion(); + } + + @Override + public HttpVersion protocolVersion() { + return request.protocolVersion(); + } + + @Override + public HttpRequest setProtocolVersion(HttpVersion version) { + request.setProtocolVersion(version); + return this; + } + + @Override + public HttpHeaders headers() { + return request.headers(); + } + + @Override + public DecoderResult getDecoderResult() { + return decoderResult(); + } + + @Override + public void setDecoderResult(DecoderResult result) { + request.setDecoderResult(result); + } + + @Override + public DecoderResult decoderResult() { + return request.decoderResult(); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedLastHttpContent.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedLastHttpContent.java new file mode 100644 index 0000000000000..e31eb2a85a598 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/PipelinedLastHttpContent.java @@ -0,0 +1,113 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.DecoderResult; +import io.netty.handler.codec.http.DefaultLastHttpContent; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.LastHttpContent; + +/** + * A {@link LastHttpContent} with pipeline sequence number + */ +public record PipelinedLastHttpContent(LastHttpContent httpContent, int sequence) implements PipelinedHttpObject, LastHttpContent { + + public PipelinedLastHttpContent(ByteBuf buf, int sequence) { + this(new DefaultLastHttpContent(buf), sequence); + } + + public PipelinedLastHttpContent withContent(LastHttpContent httpContent) { + return new PipelinedLastHttpContent(httpContent, sequence); + } + + @Override + public HttpHeaders trailingHeaders() { + return httpContent.trailingHeaders(); + } + + @Override + public ByteBuf content() { + return httpContent.content(); + } + + @Override + public LastHttpContent copy() { + return withContent(httpContent.copy()); + } + + @Override + public LastHttpContent duplicate() { + return withContent(httpContent.duplicate()); + } + + @Override + public LastHttpContent retainedDuplicate() { + return withContent(httpContent.retainedDuplicate()); + } + + @Override + public LastHttpContent replace(ByteBuf content) { + return withContent(httpContent.replace(content)); + } + + @Override + public LastHttpContent retain(int increment) { + httpContent.retain(increment); + return this; + } + + @Override + public int refCnt() { + return httpContent.refCnt(); + } + + @Override + public LastHttpContent retain() { + httpContent.retain(); + return this; + } + + @Override + public LastHttpContent touch() { + httpContent.touch(); + return this; + } + + @Override + public LastHttpContent touch(Object hint) { + httpContent.touch(hint); + return this; + } + + @Override + public boolean release() { + return httpContent.release(); + } + + @Override + public boolean release(int decrement) { + return httpContent.release(decrement); + } + + @Override + public DecoderResult getDecoderResult() { + return httpContent.decoderResult(); + } + + @Override + public void setDecoderResult(DecoderResult result) { + httpContent.setDecoderResult(result); + } + + @Override + public DecoderResult decoderResult() { + return httpContent.decoderResult(); + } +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4ContentDecompressorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4ContentDecompressorTests.java new file mode 100644 index 0000000000000..b22fa2b78bab2 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4ContentDecompressorTests.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObjectAggregator; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.zip.GZIPOutputStream; + +import static org.hamcrest.Matchers.sameInstance; + +public class Netty4ContentDecompressorTests extends ESTestCase { + + public void testDecompressRequest() throws IOException { + var chan = new EmbeddedChannel(new Netty4ContentDecompressor(), new HttpObjectAggregator(1024 * 1024)); + + for (int i = 0; i < randomIntBetween(1, 10); i++) { + var data = randomByteArrayOfLength(512 * 1024); + var zipped = Unpooled.buffer(data.length); // zipped version would about same size + try (var zos = new GZIPOutputStream(new ByteBufOutputStream(zipped))) { + zos.write(data); + zos.flush(); + } + + var sendReq = new PipelinedHttpRequest(HttpMethod.POST, "/uri", 0); + sendReq.headers().add(HttpHeaderNames.CONTENT_ENCODING, HttpHeaderValues.GZIP); + chan.writeInbound(sendReq); + chan.writeInbound(new PipelinedLastHttpContent(zipped, 0)); + + var msg = chan.readInbound(); + assertNotNull(msg); + var recvReq = (FullHttpRequest) msg; + assertTrue(ByteBufUtil.equals(Unpooled.wrappedBuffer(data), recvReq.content())); + recvReq.release(); + } + } + + public void testSkipDecompression() { + var chan = new EmbeddedChannel(new Netty4ContentDecompressor()); + + for (int i = 0; i < randomIntBetween(1, 10); i++) { + var data = randomByteArrayOfLength(512 * 1024); + var req = new PipelinedHttpRequest(HttpMethod.POST, "/uri", 0); + var content = new PipelinedLastHttpContent(Unpooled.wrappedBuffer(data), 0); + chan.writeInbound(req); + chan.writeInbound(content); + assertThat(chan.readInbound(), sameInstance(req)); + assertThat(chan.readInbound(), sameInstance(content)); + } + } +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpAggregatorTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpAggregatorTests.java new file mode 100644 index 0000000000000..c475a3c52fde8 --- /dev/null +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpAggregatorTests.java @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.LastHttpContent; + +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.function.Predicate; + +import static org.hamcrest.Matchers.sameInstance; + +public class Netty4HttpAggregatorTests extends ESTestCase { + + public void testAggregateParts() { + var chan = new EmbeddedChannel(new Netty4HttpAggregator(1024 * 1024)); + var reqNum = randomIntBetween(1, 100); + var wantData = new ArrayList(); + + for (var sequence = 0; sequence < reqNum; sequence++) { + chan.writeInbound(new PipelinedHttpRequest(HttpMethod.GET, "/uri", sequence)); + var data = Unpooled.buffer(128); + for (var part = 0; part < randomInt(10); part++) { + var chunk = randomByteArrayOfLength(128); + data.writeBytes(chunk); + chan.writeInbound(new PipelinedHttpContent(Unpooled.wrappedBuffer(chunk), sequence)); + } + if (randomBoolean()) { + chan.writeInbound(new PipelinedLastHttpContent(LastHttpContent.EMPTY_LAST_CONTENT, sequence)); + } else { + var last = randomByteArrayOfLength(128); + data.writeBytes(last); + chan.writeInbound(new PipelinedLastHttpContent(Unpooled.wrappedBuffer(last), sequence)); + } + wantData.add(data); + } + + assertEquals("should aggregate all requests", reqNum, chan.inboundMessages().size()); + for (int sequence = 0; sequence < reqNum; sequence++) { + var fullReq = (PipelinedFullHttpRequest) chan.inboundMessages().poll(); + assertEquals("sequence must match", fullReq.sequence(), sequence); + assertTrue("content must match", ByteBufUtil.equals(wantData.get(sequence), fullReq.content())); + } + } + + public void testSkipAggregationByPredicate() { + Predicate skipBulkApi = (req) -> req.uri().equals("_bulk") == false; + var chan = new EmbeddedChannel(new Netty4HttpAggregator(1024, skipBulkApi)); + + var req = new PipelinedHttpRequest(HttpMethod.POST, "_bulk", 0); + var content = new PipelinedLastHttpContent(LastHttpContent.EMPTY_LAST_CONTENT, 0); + chan.writeInbound(req, content); + + assertThat(chan.readInbound(), sameInstance(req)); + assertThat(chan.readInbound(), sameInstance(content)); + } + +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index b2158384fa1cf..812b35a260922 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -125,12 +125,16 @@ public void testThatPipeliningWorksWithFastSerializedRequests() throws Interrupt } private EmbeddedChannel makeEmbeddedChannelWithSimulatedWork(int numberOfRequests) { - return new EmbeddedChannel(new Netty4HttpPipeliningHandler(numberOfRequests, null, new ThreadWatchdog.ActivityTracker()) { - @Override - protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) { - ctx.fireChannelRead(pipelinedRequest); - } - }, new WorkEmulatorHandler()); + return new EmbeddedChannel( + new Netty4InboundHttpPipeliningHandler(), + new Netty4HttpPipeliningHandler(numberOfRequests, null, new ThreadWatchdog.ActivityTracker()) { + @Override + protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) { + ctx.fireChannelRead(pipelinedRequest); + } + }, + new WorkEmulatorHandler() + ); } public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException { @@ -192,6 +196,7 @@ public void testThatPipeliningClosesConnectionWithTooManyEvents() throws Interru public void testPipeliningRequestsAreReleased() { final int numberOfRequests = 10; final EmbeddedChannel embeddedChannel = new EmbeddedChannel( + new Netty4InboundHttpPipeliningHandler(), new Netty4HttpPipeliningHandler(numberOfRequests + 1, null, new ThreadWatchdog.ActivityTracker()) ); @@ -224,9 +229,13 @@ public void testPipeliningRequestsAreReleased() { } } + private static EmbeddedChannel capturingEmbeddedChannel(List messagesSeen) { + return new EmbeddedChannel(capturingHandler(messagesSeen), new Netty4InboundHttpPipeliningHandler(), getTestHttpHandler()); + } + public void testSmallFullResponsesAreSentDirectly() { final List messagesSeen = new ArrayList<>(); - final var embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final var embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/test")); final Netty4HttpRequest request = embeddedChannel.readInbound(); final var maxSize = (int) NettyAllocator.suggestedMaxAllocationSize() / 2; @@ -242,7 +251,7 @@ public void testSmallFullResponsesAreSentDirectly() { public void testLargeFullResponsesAreSplit() { final List messagesSeen = new ArrayList<>(); - final var embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final var embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/test")); final Netty4HttpRequest request = embeddedChannel.readInbound(); final var minSize = (int) NettyAllocator.suggestedMaxAllocationSize(); @@ -264,7 +273,7 @@ public void testLargeFullResponsesAreSplit() { } public void testDecoderErrorSurfacedAsNettyInboundError() { - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(new ArrayList<>()); // a request with a decoder error final DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/uri"); Exception cause = new ElasticsearchException("Boom"); @@ -276,7 +285,7 @@ public void testDecoderErrorSurfacedAsNettyInboundError() { public void testResumesChunkedMessage() { final List messagesSeen = new ArrayList<>(); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/chunked")); final Netty4HttpRequest request = embeddedChannel.readInbound(); final BytesReference chunk = new BytesArray(randomByteArrayOfLength(embeddedChannel.config().getWriteBufferHighWaterMark() + 1)); @@ -294,7 +303,7 @@ public void testResumesChunkedMessage() { public void testResumesAfterChunkedMessage() { final List messagesSeen = new ArrayList<>(); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/chunked")); embeddedChannel.writeInbound(createHttpRequest("/chunked2")); final Netty4HttpRequest request1 = embeddedChannel.readInbound(); @@ -327,7 +336,7 @@ public void testResumesAfterChunkedMessage() { public void testResumesSingleAfterChunkedMessage() { final List messagesSeen = new ArrayList<>(); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/chunked")); embeddedChannel.writeInbound(createHttpRequest("/single")); final Netty4HttpRequest request1 = embeddedChannel.readInbound(); @@ -363,7 +372,7 @@ public void testResumesSingleAfterChunkedMessage() { public void testChunkedResumesAfterSingleMessage() { final List messagesSeen = new ArrayList<>(); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/chunked")); final Netty4HttpRequest request1 = embeddedChannel.readInbound(); embeddedChannel.writeInbound(createHttpRequest("/chunked2")); @@ -398,7 +407,7 @@ public void testChunkedResumesAfterSingleMessage() { public void testChunkedWithSmallChunksResumesAfterSingleMessage() { final List messagesSeen = new ArrayList<>(); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/chunked")); final Netty4HttpRequest request1 = embeddedChannel.readInbound(); embeddedChannel.writeInbound(createHttpRequest("/chunked2")); @@ -436,7 +445,7 @@ public void testChunkedWithSmallChunksResumesAfterSingleMessage() { public void testPipeliningRequestsAreReleasedAfterFailureOnChunked() { final List messagesSeen = new ArrayList<>(); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(capturingHandler(messagesSeen), getTestHttpHandler()); + final EmbeddedChannel embeddedChannel = capturingEmbeddedChannel(messagesSeen); embeddedChannel.writeInbound(createHttpRequest("/chunked")); final Netty4HttpRequest chunkedResponseRequest = embeddedChannel.readInbound(); @@ -495,7 +504,7 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque } }; - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new ChannelDuplexHandler(), handler); + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4InboundHttpPipeliningHandler(), handler); embeddedChannel.writeInbound(createHttpRequest("/test")); assertTrue(requestHandled.get()); @@ -524,7 +533,7 @@ private static void assertDoneWithClosedChannel(ChannelPromise chunkedWritePromi assertThat(chunkedWritePromise.cause(), instanceOf(ClosedChannelException.class)); } - private Netty4HttpPipeliningHandler getTestHttpHandler() { + private static Netty4HttpPipeliningHandler getTestHttpHandler() { return new Netty4HttpPipeliningHandler( Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class), diff --git a/server/src/main/java/org/elasticsearch/http/HttpContent.java b/server/src/main/java/org/elasticsearch/http/HttpContent.java new file mode 100644 index 0000000000000..63509cf85e3f6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/http/HttpContent.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http; + +import org.elasticsearch.common.bytes.BytesReference; + +public interface HttpContent { + + BytesReference content(); + + void release(); + +} diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index 2757fa15ce477..a8f3838f591c9 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -14,6 +14,7 @@ import org.elasticsearch.rest.RestStatus; import java.util.List; +import java.util.concurrent.Flow; /** * A basic http request abstraction. Http modules needs to implement this interface to integrate with the @@ -29,6 +30,10 @@ enum HttpVersion { BytesReference content(); + default Flow.Publisher contentPublisher() { + throw new IllegalArgumentException(); + } + List strictCookies(); HttpVersion protocolVersion(); diff --git a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java index a17bc885f6b65..f4d8b956f7ad3 100644 --- a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java @@ -109,7 +109,7 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); } - if (request.hasContent() && request.isContentConsumed() == false) { + if (request.hasContent() && request.isContentConsumed() == false && request.contentPublisher() == null) { throw new IllegalArgumentException( "request [" + request.method() + " " + request.path() + "] does not support having a body" ); diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 66ba0c743813e..b8449af01e615 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -23,6 +23,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.http.HttpChannel; +import org.elasticsearch.http.HttpContent; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.telemetry.tracing.Traceable; import org.elasticsearch.xcontent.ParsedMediaType; @@ -40,6 +41,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; @@ -290,6 +292,10 @@ public BytesReference content() { return httpRequest.content(); } + public Flow.Publisher contentPublisher() { + return httpRequest.contentPublisher(); + } + /** * @return content of the request body or throw an exception if the body or content type is missing */