diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 4de7ca97ed51b..a29959503830d 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -148,14 +148,14 @@ public void testClientConnectionCloseMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertEquals(1, handler.stream.chunkQueue().size())); + assertBusy(() -> assertEquals(1, handler.stream.queueSize())); // enable auto-read to receive channel close event handler.stream.channel().config().setAutoRead(true); // terminate connection and wait resources are released ctx.clientChannel.close(); - assertBusy(() -> assertEquals(0, handler.stream.chunkQueue().size())); + assertBusy(() -> assertEquals(0, handler.stream.queueSize())); } } @@ -170,11 +170,11 @@ public void testServerCloseConnectionMidStream() throws Exception { // await stream handler is ready and request full content var handler = ctx.awaitRestChannelAccepted(opaqueId); - assertBusy(() -> assertEquals(1, handler.stream.chunkQueue().size())); + assertBusy(() -> assertEquals(1, handler.stream.queueSize())); // terminate connection on server and wait resources are released handler.channel.request().getHttpChannel().close(); - assertBusy(() -> assertEquals(0, handler.stream.chunkQueue().size())); + assertBusy(() -> assertEquals(0, handler.stream.queueSize())); } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4AbstractHttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4AbstractHttpRequest.java new file mode 100644 index 0000000000000..c703df5f0c341 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4AbstractHttpRequest.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.cookie.Cookie; +import io.netty.handler.codec.http.cookie.ServerCookieDecoder; +import io.netty.handler.codec.http.cookie.ServerCookieEncoder; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.http.HttpRequest; +import org.elasticsearch.http.HttpResponse; +import org.elasticsearch.rest.ChunkedRestResponseBodyPart; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract non-sealed class Netty4AbstractHttpRequest implements Netty4HttpRequest { + + private final io.netty.handler.codec.http.HttpRequest request; + private final int sequence; + + Netty4AbstractHttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request) { + this.request = request; + this.sequence = sequence; + } + + @Override + public RestRequest.Method method() { + return translateRequestMethod(request.method()); + } + + @Override + public String uri() { + return request.uri(); + } + + @Override + public final Map> getHeaders() { + return new Netty4HttpHeadersMap(request.headers()); + } + + @Override + public HttpRequest removeHeader(String header) { + request.headers().remove(header); + return this; + } + + @Override + public List strictCookies() { + String cookieString = request.headers().get(HttpHeaderNames.COOKIE); + if (cookieString != null) { + Set cookies = ServerCookieDecoder.STRICT.decode(cookieString); + if (cookies.isEmpty() == false) { + return ServerCookieEncoder.STRICT.encode(cookies); + } + } + return Collections.emptyList(); + } + + @Override + public HttpVersion protocolVersion() { + if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) { + return HttpRequest.HttpVersion.HTTP_1_0; + } else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) { + return HttpRequest.HttpVersion.HTTP_1_1; + } else { + throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion()); + } + } + + @Override + public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) { + return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef); + } + + @Override + public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { + return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart); + } + + @Override + public io.netty.handler.codec.http.HttpRequest nettyRequest() { + return request; + } + + @Override + public int sequence() { + return sequence; + } + + public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) { + if (httpMethod == HttpMethod.GET) return RestRequest.Method.GET; + + if (httpMethod == HttpMethod.POST) return RestRequest.Method.POST; + + if (httpMethod == HttpMethod.PUT) return RestRequest.Method.PUT; + + if (httpMethod == HttpMethod.DELETE) return RestRequest.Method.DELETE; + + if (httpMethod == HttpMethod.HEAD) { + return RestRequest.Method.HEAD; + } + + if (httpMethod == HttpMethod.OPTIONS) { + return RestRequest.Method.OPTIONS; + } + + if (httpMethod == HttpMethod.PATCH) { + return RestRequest.Method.PATCH; + } + + if (httpMethod == HttpMethod.TRACE) { + return RestRequest.Method.TRACE; + } + + if (httpMethod == HttpMethod.CONNECT) { + return RestRequest.Method.CONNECT; + } + + throw new IllegalArgumentException("Unexpected http method: " + httpMethod); + } + + public static Map> getHttpHeadersAsMap(HttpHeaders httpHeaders) { + return new Netty4HttpHeadersMap(httpHeaders); + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpRequest.java new file mode 100644 index 0000000000000..861d9715c025b --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4FullHttpRequest.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.handler.codec.http.FullHttpRequest; + +import org.elasticsearch.http.HttpBody; +import org.elasticsearch.http.HttpRequest; +import org.elasticsearch.transport.netty4.Netty4Utils; + +public non-sealed class Netty4FullHttpRequest extends Netty4AbstractHttpRequest implements Netty4HttpRequest { + + private final FullHttpRequest request; + private final HttpBody.Full body; + + public Netty4FullHttpRequest(int sequence, FullHttpRequest request) { + super(sequence, request); + this.request = request; + this.body = HttpBody.fromReleasableBytesReference(Netty4Utils.toReleasableBytesReference(request.content())); + } + + public Netty4FullHttpRequest(Netty4FullHttpRequest other) { + super(other.sequence(), other.request); + this.request = other.request; + this.body = other.body; + } + + @Override + public HttpBody body() { + return body; + } + + @Override + public HttpRequest removeHeader(String header) { + super.removeHeader(header); + return this; + } + + @Override + public Exception getInboundException() { + return null; + } + + @Override + public void release() { + body.close(); + request.release(); + } + + @Override + public Netty4FullHttpRequest releaseAndCopy() { + var copy = request.copy(); + release(); + return new Netty4FullHttpRequest(sequence(), copy); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeadersMap.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeadersMap.java new file mode 100644 index 0000000000000..94add12478825 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpHeadersMap.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.handler.codec.http.HttpHeaders; + +import java.util.AbstractMap; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A wrapper of {@link HttpHeaders} that implements a map to prevent copying unnecessarily. This class does not support modifications and + * due to the underlying implementation, it performs case insensitive lookups of key to values. + *

+ * It is important to note that this implementation does have some downsides in that each invocation of the {@link #values()} and + * {@link #entrySet()} methods will perform a copy of the values in the HttpHeaders rather than returning a view of the underlying values. + */ +class Netty4HttpHeadersMap implements Map> { + + private final HttpHeaders httpHeaders; + + Netty4HttpHeadersMap(HttpHeaders httpHeaders) { + this.httpHeaders = httpHeaders; + } + + @Override + public int size() { + return httpHeaders.size(); + } + + @Override + public boolean isEmpty() { + return httpHeaders.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return key instanceof String && httpHeaders.contains((String) key); + } + + @Override + public boolean containsValue(Object value) { + return value instanceof List && httpHeaders.names().stream().map(httpHeaders::getAll).anyMatch(value::equals); + } + + @Override + public List get(Object key) { + return key instanceof String ? httpHeaders.getAll((String) key) : null; + } + + @Override + public List put(String key, List value) { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public List remove(Object key) { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public void putAll(Map> m) { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public void clear() { + throw new UnsupportedOperationException("modifications are not supported"); + } + + @Override + public Set keySet() { + return httpHeaders.names(); + } + + @Override + public Collection> values() { + return httpHeaders.names().stream().map(k -> Collections.unmodifiableList(httpHeaders.getAll(k))).toList(); + } + + @Override + public Set>> entrySet() { + return httpHeaders.names() + .stream() + .map(k -> new AbstractMap.SimpleImmutableEntry<>(k, httpHeaders.getAll(k))) + .collect(Collectors.toSet()); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 18e76a51efa17..72a9138848f1e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -20,6 +20,7 @@ import io.netty.handler.codec.http.DefaultLastHttpContent; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; @@ -40,6 +41,7 @@ import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; +import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -64,6 +66,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { private final int maxEventsHeld; private final ThreadWatchdog.ActivityTracker activityTracker; private final PriorityQueue> outboundHoldingQueue; + private final HttpHandlingSettings handlingSettings; private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBodyPart responseBodyPart) {} @@ -77,7 +80,7 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu * HTTP request content stream for current request, it's null if there is no current request or request is fully-aggregated */ @Nullable - private Netty4HttpRequestBodyStream currentRequestStream; + private Netty4HttpStreamRequest currentStreamRequest; /* * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the @@ -106,12 +109,14 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu public Netty4HttpPipeliningHandler( final int maxEventsHeld, final Netty4HttpServerTransport serverTransport, - final ThreadWatchdog.ActivityTracker activityTracker + final ThreadWatchdog.ActivityTracker activityTracker, + final HttpHandlingSettings handlingSettings ) { this.maxEventsHeld = maxEventsHeld; this.activityTracker = activityTracker; this.outboundHoldingQueue = new PriorityQueue<>(1, Comparator.comparingInt(t -> t.v1().getSequence())); this.serverTransport = serverTransport; + this.handlingSettings = handlingSettings; } @Override @@ -129,25 +134,24 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { } else { nonError = (Exception) cause; } - netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError); + netty4HttpRequest = new Netty4HttpRequestException(readSequence++, (FullHttpRequest) request, nonError); + currentStreamRequest = null; + } else if (request instanceof FullHttpRequest fullHttpRequest) { + netty4HttpRequest = new Netty4FullHttpRequest(readSequence++, fullHttpRequest); + currentStreamRequest = null; } else { - assert currentRequestStream == null : "current stream must be null for new request"; - if (request instanceof FullHttpRequest fullHttpRequest) { - netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); - currentRequestStream = null; - } else { - var contentStream = new Netty4HttpRequestBodyStream(ctx.channel()); - currentRequestStream = contentStream; - netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); - } + assert currentStreamRequest == null : "current stream must be null for new request"; + var contentStream = new Netty4HttpRequestBodyStream(ctx.channel()); + currentStreamRequest = new Netty4HttpStreamRequest(readSequence++, request, contentStream); + netty4HttpRequest = currentStreamRequest; } handlePipelinedRequest(ctx, netty4HttpRequest); } else { assert msg instanceof HttpContent : "expect HttpContent got " + msg; - assert currentRequestStream != null : "current stream must exists before handling http content"; - currentRequestStream.handleNettyContent((HttpContent) msg); + assert currentStreamRequest != null : "current stream must exists before handling http content"; + currentStreamRequest.body().onHttpContent((HttpContent) msg); if (msg instanceof LastHttpContent) { - currentRequestStream = null; + currentStreamRequest = null; } } } finally { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index 2d1caba3c477e..a9d253768ebdd 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -8,320 +8,12 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.EmptyHttpHeaders; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.cookie.Cookie; -import io.netty.handler.codec.http.cookie.ServerCookieDecoder; -import io.netty.handler.codec.http.cookie.ServerCookieEncoder; - -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.http.HttpBody; import org.elasticsearch.http.HttpRequest; -import org.elasticsearch.http.HttpResponse; -import org.elasticsearch.rest.ChunkedRestResponseBodyPart; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.transport.netty4.Netty4Utils; - -import java.util.AbstractMap; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -public class Netty4HttpRequest implements HttpRequest { - - private final FullHttpRequest request; - private final HttpBody content; - private final Map> headers; - private final AtomicBoolean released; - private final Exception inboundException; - private final boolean pooled; - private final int sequence; - - Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) { - this( - sequence, - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.EMPTY_BUFFER, - request.headers(), - EmptyHttpHeaders.INSTANCE - ), - new AtomicBoolean(false), - false, - contentStream, - null - ); - } - - Netty4HttpRequest(int sequence, FullHttpRequest request) { - this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content())); - } - - Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) { - this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException); - } - - private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) { - this(sequence, request, released, pooled, content, null); - } - - private Netty4HttpRequest( - int sequence, - FullHttpRequest request, - AtomicBoolean released, - boolean pooled, - HttpBody content, - Exception inboundException - ) { - this.sequence = sequence; - this.request = request; - this.headers = getHttpHeadersAsMap(request.headers()); - this.content = content; - this.pooled = pooled; - this.released = released; - this.inboundException = inboundException; - } - - @Override - public RestRequest.Method method() { - return translateRequestMethod(request.method()); - } - - @Override - public String uri() { - return request.uri(); - } - - @Override - public HttpBody body() { - assert released.get() == false; - return content; - } - - @Override - public void release() { - if (pooled && released.compareAndSet(false, true)) { - request.release(); - } - } - - @Override - public HttpRequest releaseAndCopy() { - assert released.get() == false; - if (pooled == false) { - return this; - } - try { - final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); - return new Netty4HttpRequest( - sequence, - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - copiedContent, - request.headers(), - request.trailingHeaders() - ), - new AtomicBoolean(false), - false, - content - ); - } finally { - release(); - } - } - - @Override - public final Map> getHeaders() { - return headers; - } - - @Override - public List strictCookies() { - String cookieString = request.headers().get(HttpHeaderNames.COOKIE); - if (cookieString != null) { - Set cookies = ServerCookieDecoder.STRICT.decode(cookieString); - if (cookies.isEmpty() == false) { - return ServerCookieEncoder.STRICT.encode(cookies); - } - } - return Collections.emptyList(); - } - - @Override - public HttpVersion protocolVersion() { - if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_0)) { - return HttpRequest.HttpVersion.HTTP_1_0; - } else if (request.protocolVersion().equals(io.netty.handler.codec.http.HttpVersion.HTTP_1_1)) { - return HttpRequest.HttpVersion.HTTP_1_1; - } else { - throw new IllegalArgumentException("Unexpected http protocol version: " + request.protocolVersion()); - } - } - - @Override - public HttpRequest removeHeader(String header) { - HttpHeaders copiedHeadersWithout = request.headers().copy(); - copiedHeadersWithout.remove(header); - HttpHeaders copiedTrailingHeadersWithout = request.trailingHeaders().copy(); - copiedTrailingHeadersWithout.remove(header); - FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - request.content(), - copiedHeadersWithout, - copiedTrailingHeadersWithout - ); - return new Netty4HttpRequest(sequence, requestWithoutHeader, released, pooled, content); - } - - @Override - public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference contentRef) { - return new Netty4FullHttpResponse(sequence, request.protocolVersion(), status, contentRef); - } - - @Override - public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) { - return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart); - } - - @Override - public Exception getInboundException() { - return inboundException; - } - - public io.netty.handler.codec.http.HttpRequest getNettyRequest() { - return request; - } - - public static RestRequest.Method translateRequestMethod(HttpMethod httpMethod) { - if (httpMethod == HttpMethod.GET) return RestRequest.Method.GET; - - if (httpMethod == HttpMethod.POST) return RestRequest.Method.POST; - - if (httpMethod == HttpMethod.PUT) return RestRequest.Method.PUT; - - if (httpMethod == HttpMethod.DELETE) return RestRequest.Method.DELETE; - - if (httpMethod == HttpMethod.HEAD) { - return RestRequest.Method.HEAD; - } - - if (httpMethod == HttpMethod.OPTIONS) { - return RestRequest.Method.OPTIONS; - } - - if (httpMethod == HttpMethod.PATCH) { - return RestRequest.Method.PATCH; - } - - if (httpMethod == HttpMethod.TRACE) { - return RestRequest.Method.TRACE; - } - - if (httpMethod == HttpMethod.CONNECT) { - return RestRequest.Method.CONNECT; - } - - throw new IllegalArgumentException("Unexpected http method: " + httpMethod); - } - - public static Map> getHttpHeadersAsMap(HttpHeaders httpHeaders) { - return new HttpHeadersMap(httpHeaders); - } - - /** - * A wrapper of {@link HttpHeaders} that implements a map to prevent copying unnecessarily. This class does not support modifications - * and due to the underlying implementation, it performs case insensitive lookups of key to values. - * - * It is important to note that this implementation does have some downsides in that each invocation of the - * {@link #values()} and {@link #entrySet()} methods will perform a copy of the values in the HttpHeaders rather than returning a - * view of the underlying values. - */ - private static class HttpHeadersMap implements Map> { - - private final HttpHeaders httpHeaders; - - private HttpHeadersMap(HttpHeaders httpHeaders) { - this.httpHeaders = httpHeaders; - } - - @Override - public int size() { - return httpHeaders.size(); - } - - @Override - public boolean isEmpty() { - return httpHeaders.isEmpty(); - } - - @Override - public boolean containsKey(Object key) { - return key instanceof String && httpHeaders.contains((String) key); - } - - @Override - public boolean containsValue(Object value) { - return value instanceof List && httpHeaders.names().stream().map(httpHeaders::getAll).anyMatch(value::equals); - } - - @Override - public List get(Object key) { - return key instanceof String ? httpHeaders.getAll((String) key) : null; - } - - @Override - public List put(String key, List value) { - throw new UnsupportedOperationException("modifications are not supported"); - } - - @Override - public List remove(Object key) { - throw new UnsupportedOperationException("modifications are not supported"); - } - - @Override - public void putAll(Map> m) { - throw new UnsupportedOperationException("modifications are not supported"); - } - - @Override - public void clear() { - throw new UnsupportedOperationException("modifications are not supported"); - } - @Override - public Set keySet() { - return httpHeaders.names(); - } +public sealed interface Netty4HttpRequest extends HttpRequest permits Netty4AbstractHttpRequest, Netty4FullHttpRequest, + Netty4HttpRequestException, Netty4HttpStreamRequest { - @Override - public Collection> values() { - return httpHeaders.names().stream().map(k -> Collections.unmodifiableList(httpHeaders.getAll(k))).toList(); - } + int sequence(); - @Override - public Set>> entrySet() { - return httpHeaders.names() - .stream() - .map(k -> new AbstractMap.SimpleImmutableEntry<>(k, httpHeaders.getAll(k))) - .collect(Collectors.toSet()); - } - } + io.netty.handler.codec.http.HttpRequest nettyRequest(); } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 8497e3ee8a40d..57ec00cf74d83 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -16,6 +16,8 @@ import org.elasticsearch.transport.netty4.Netty4Utils; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; /** @@ -26,88 +28,212 @@ */ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { - private final Channel channel; - private final Queue chunkQueue = new ArrayDeque<>(); - private boolean requested = false; - private boolean hasLast = false; - private HttpBody.ChunkHandler handler; + private State state; public Netty4HttpRequestBodyStream(Channel channel) { - this.channel = channel; - channel.closeFuture().addListener((f) -> releaseQueuedChunks()); - channel.config().setAutoRead(false); + this.state = new State.Queueing(channel); + channel.closeFuture().addListener(f -> onChannelClose()); + } + + private static void releaseQueuedChunks(Queue chunkQueue) { + while (chunkQueue.isEmpty() == false) { + chunkQueue.poll().release(); + } } @Override - public ChunkHandler handler() { - return handler; + public void addTracingHandler(ChunkHandler chunkHandler) { + if (state instanceof State.Queueing q) { + q.tracingHandlers.add(chunkHandler); + } else if (state instanceof State.Streaming s) { + s.tracingHandlers.add(chunkHandler); + } } @Override - public void setHandler(ChunkHandler chunkHandler) { - this.handler = chunkHandler; + public void setConsumingHandler(ChunkHandler chunkHandler) { + if (isQueueing() == false) { + throw new IllegalStateException("only queueing state can set consuming handler, got " + stateName()); + } + state = new State.Streaming((State.Queueing) state, chunkHandler); } - private void sendQueuedOrRead() { - assert channel.eventLoop().inEventLoop(); - requested = true; - var chunk = chunkQueue.poll(); - if (chunk == null) { - channel.read(); - } else { - sendChunk(chunk); + @Override + public void discard() { + if (state instanceof State.Queueing q) { + state = new State.Draining(q); + } else if (state instanceof State.Streaming s) { + state = new State.Draining(s); } } @Override public void next() { - assert handler != null : "handler must be set before requesting next chunk"; - if (channel.eventLoop().inEventLoop()) { - sendQueuedOrRead(); - } else { - channel.eventLoop().submit(this::sendQueuedOrRead); + if (isStreaming()) { + var streamingState = state.asStreaming(); + if (streamingState.channel.eventLoop().inEventLoop()) { + streamingState.sendQueuedOrRead(); + } else { + streamingState.channel.eventLoop().submit(streamingState::sendQueuedOrRead); + } } } - public void handleNettyContent(HttpContent httpContent) { - assert handler != null : "handler must be set before processing http content"; - if (requested && chunkQueue.isEmpty()) { - sendChunk(httpContent); + public void onHttpContent(HttpContent httpContent) { + var isLast = httpContent instanceof LastHttpContent; + if (isCompleted()) { + throw new IllegalStateException("received netty chunk on completed stream"); + } else if (isDraining()) { + httpContent.release(); + if (isLast) { + state = new State.Completed(state.asDraining()); + } + } else if (isQueueing()) { + state.asQueueing().chunkQueue.add(httpContent); + } else if (isStreaming()) { + var streamingState = state.asStreaming(); + if (streamingState.requested && streamingState.chunkQueue.isEmpty()) { + streamingState.sendChunk(httpContent); + if (isLast) { + state = new State.Completed(streamingState); + } + } else { + streamingState.chunkQueue.add(httpContent); + } } else { - chunkQueue.add(httpContent); - } - if (httpContent instanceof LastHttpContent) { - hasLast = true; - channel.config().setAutoRead(true); + assert false : "must handle all states, got " + stateName(); } } // visible for test Channel channel() { - return channel; + return null; } // visible for test - Queue chunkQueue() { - return chunkQueue; + int queueSize() { + if (isQueueing()) { + return state.asQueueing().chunkQueue.size(); + } else if (isStreaming()) { + return state.asStreaming().chunkQueue.size(); + } else { + return 0; + } } // visible for test boolean hasLast() { - return hasLast; + return false; } - private void sendChunk(HttpContent httpContent) { - assert requested; - requested = false; - var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content()); - var isLast = httpContent instanceof LastHttpContent; - handler.onNext(bytesRef, isLast); + private String stateName() { + return state.getClass().getSimpleName(); } - private void releaseQueuedChunks() { - while (chunkQueue.isEmpty() == false) { - chunkQueue.poll().release(); + boolean isQueueing() { + return state instanceof State.Queueing; + } + + boolean isStreaming() { + return state instanceof State.Streaming; + } + + boolean isDraining() { + return state instanceof State.Draining; + } + + boolean isCompleted() { + return state instanceof State.Completed; + } + + private void onChannelClose() { + discard(); + } + + private sealed interface State permits State.Completed, State.Draining, State.Queueing, State.Streaming { + + default Queueing asQueueing() { + assert this instanceof Queueing; + return (Queueing) this; + } + + default Streaming asStreaming() { + assert this instanceof Streaming; + return (Streaming) this; + } + + default Draining asDraining() { + assert this instanceof Draining; + return (Draining) this; + } + + final class Queueing implements State { + final Channel channel; + final Queue chunkQueue = new ArrayDeque<>(); + final List tracingHandlers = new ArrayList<>(); + boolean hasLast = false; + + Queueing(Channel channel) { + this.channel = channel; + channel.config().setAutoRead(false); + } + } + + final class Streaming implements State { + final Channel channel; + final Queue chunkQueue; + final List tracingHandlers; + final ChunkHandler consumingHandler; + boolean hasLast; + boolean requested; + + Streaming(Queueing queueing, ChunkHandler consumingHandler) { + this.channel = queueing.channel; + this.chunkQueue = queueing.chunkQueue; + this.tracingHandlers = queueing.tracingHandlers; + this.consumingHandler = consumingHandler; + this.hasLast = queueing.hasLast; + } + + void sendChunk(HttpContent httpContent) { + assert requested; + requested = false; + var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content()); + var isLast = httpContent instanceof LastHttpContent; + tracingHandlers.forEach(h -> h.onNext(bytesRef, isLast)); + consumingHandler.onNext(bytesRef, isLast); + } + + void sendQueuedOrRead() { + assert channel.eventLoop().inEventLoop(); + requested = true; + var chunk = chunkQueue.poll(); + if (chunk == null) { + channel.read(); + } else { + sendChunk(chunk); + } + } + } + + final class Completed implements State { + Completed(Draining draining) {} + + Completed(Streaming streaming) { + streaming.channel.config().setAutoRead(true); + } + } + + final class Draining implements State { + Draining(Queueing queueing) { + queueing.channel.config().setAutoRead(true); + releaseQueuedChunks(queueing.chunkQueue); + } + + Draining(Streaming streaming) { + streaming.channel.config().setAutoRead(true); + releaseQueuedChunks(streaming.chunkQueue); + } } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestException.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestException.java new file mode 100644 index 0000000000000..54f4e8405ddcd --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestException.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import io.netty.handler.codec.http.FullHttpRequest; + +import org.elasticsearch.http.HttpRequest; + +public final class Netty4HttpRequestException extends Netty4FullHttpRequest implements Netty4HttpRequest { + + private final Exception exception; + + public Netty4HttpRequestException(int sequence, FullHttpRequest request, Exception exception) { + super(sequence, request); + this.exception = exception; + } + + Netty4HttpRequestException(Netty4FullHttpRequest request, Exception exception) { + super(request); + this.exception = exception; + } + + @Override + public HttpRequest removeHeader(String header) { + super.removeHeader(header); + return this; + } + + @Override + public Exception getInboundException() { + return exception; + } + + @Override + public Netty4HttpRequestException releaseAndCopy() { + var copy = super.releaseAndCopy(); + return new Netty4HttpRequestException(copy, exception); + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 024391af46b62..d5f40e00a8005 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -23,7 +23,6 @@ import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpMessage; -import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; @@ -364,9 +363,6 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception { ) ); } - // combines the HTTP message pieces into a single full HTTP request (with headers and body) - final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength()); - aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); ch.pipeline() .addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression .addLast("encoder", new HttpResponseEncoder() { @@ -380,8 +376,7 @@ protected boolean isContentAlwaysEmpty(HttpResponse msg) { } return super.isContentAlwaysEmpty(msg); } - }) - .addLast("aggregator", aggregator); + }); if (handlingSettings.compression()) { ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.compressionLevel()) { @Override @@ -400,7 +395,8 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t new Netty4HttpPipeliningHandler( transport.pipeliningMaxEvents, transport, - transport.threadWatchdog.getActivityTrackerForCurrentThread() + transport.threadWatchdog.getActivityTrackerForCurrentThread(), + handlingSettings ) ); transport.serverAcceptedChannel(nettyHttpChannel); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpStreamRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpStreamRequest.java new file mode 100644 index 0000000000000..b343b4efb6f06 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpStreamRequest.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.http.netty4; + +import org.elasticsearch.http.HttpBody; +import org.elasticsearch.http.HttpRequest; + +public final class Netty4HttpStreamRequest extends Netty4AbstractHttpRequest implements Netty4HttpRequest { + + private final Netty4HttpRequestBodyStream body; + + public Netty4HttpStreamRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream body) { + super(sequence, request); + this.body = body; + } + + @Override + public Netty4HttpRequestBodyStream body() { + return body; + } + + @Override + public Exception getInboundException() { + return null; + } + + @Override + public void release() { + body.discard(); + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/internal/HttpHeadersAuthenticatorUtils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/internal/HttpHeadersAuthenticatorUtils.java index a289d4b42e62d..517de089092c1 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/internal/HttpHeadersAuthenticatorUtils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/internal/HttpHeadersAuthenticatorUtils.java @@ -23,8 +23,8 @@ import java.util.List; import java.util.Map; -import static org.elasticsearch.http.netty4.Netty4HttpRequest.getHttpHeadersAsMap; -import static org.elasticsearch.http.netty4.Netty4HttpRequest.translateRequestMethod; +import static org.elasticsearch.http.netty4.Netty4AbstractHttpRequest.getHttpHeadersAsMap; +import static org.elasticsearch.http.netty4.Netty4AbstractHttpRequest.translateRequestMethod; /** * Provides utilities for hooking into the netty pipeline and authenticate each HTTP request's headers. @@ -111,9 +111,9 @@ private static HttpHeadersWithAuthenticationContext unwrapAuthenticatedHeaders(o if (request instanceof Netty4HttpRequest == false) { return null; } - if (((Netty4HttpRequest) request).getNettyRequest().headers() instanceof HttpHeadersWithAuthenticationContext == false) { + if (((Netty4HttpRequest) request).nettyRequest().headers() instanceof HttpHeadersWithAuthenticationContext == false) { return null; } - return (HttpHeadersWithAuthenticationContext) (((Netty4HttpRequest) request).getNettyRequest().headers()); + return (HttpHeadersWithAuthenticationContext) (((Netty4HttpRequest) request).nettyRequest().headers()); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java index b2158384fa1cf..54761ab64c922 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.network.ThreadWatchdogHelper; import org.elasticsearch.common.recycler.Recycler; +import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpResponse; import org.elasticsearch.rest.ChunkedRestResponseBodyPart; import org.elasticsearch.rest.RestStatus; @@ -125,12 +126,20 @@ public void testThatPipeliningWorksWithFastSerializedRequests() throws Interrupt } private EmbeddedChannel makeEmbeddedChannelWithSimulatedWork(int numberOfRequests) { - return new EmbeddedChannel(new Netty4HttpPipeliningHandler(numberOfRequests, null, new ThreadWatchdog.ActivityTracker()) { - @Override - protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) { - ctx.fireChannelRead(pipelinedRequest); - } - }, new WorkEmulatorHandler()); + return new EmbeddedChannel( + new Netty4HttpPipeliningHandler( + numberOfRequests, + null, + new ThreadWatchdog.ActivityTracker(), + HttpHandlingSettings.empty() + ) { + @Override + protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) { + ctx.fireChannelRead(pipelinedRequest); + } + }, + new WorkEmulatorHandler() + ); } public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException { @@ -192,7 +201,12 @@ public void testThatPipeliningClosesConnectionWithTooManyEvents() throws Interru public void testPipeliningRequestsAreReleased() { final int numberOfRequests = 10; final EmbeddedChannel embeddedChannel = new EmbeddedChannel( - new Netty4HttpPipeliningHandler(numberOfRequests + 1, null, new ThreadWatchdog.ActivityTracker()) + new Netty4HttpPipeliningHandler( + numberOfRequests + 1, + null, + new ThreadWatchdog.ActivityTracker(), + HttpHandlingSettings.empty() + ) ); for (int i = 0; i < numberOfRequests; i++) { @@ -210,7 +224,7 @@ public void testPipeliningRequestsAreReleased() { ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); Netty4HttpRequest pipelinedRequest = requests.get(i); - Netty4FullHttpResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); + Netty4FullHttpResponse resp = (Netty4FullHttpResponse) pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY); embeddedChannel.writeAndFlush(resp, promise); } @@ -462,7 +476,7 @@ public void testPipeliningRequestsAreReleasedAfterFailureOnChunked() { for (Netty4HttpRequest request : requests) { ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); - Netty4FullHttpResponse resp = request.createResponse(RestStatus.OK, BytesArray.EMPTY); + Netty4FullHttpResponse resp = (Netty4FullHttpResponse) request.createResponse(RestStatus.OK, BytesArray.EMPTY); embeddedChannel.write(resp, promise); } assertFalse(chunkedWritePromise.isDone()); @@ -484,7 +498,12 @@ public void testActivityTracking() { final var watchdog = new ThreadWatchdog(); final var activityTracker = watchdog.getActivityTrackerForCurrentThread(); final var requestHandled = new AtomicBoolean(); - final var handler = new Netty4HttpPipeliningHandler(Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class), activityTracker) { + final var handler = new Netty4HttpPipeliningHandler( + Integer.MAX_VALUE, + mock(Netty4HttpServerTransport.class), + activityTracker, + HttpHandlingSettings.empty() + ) { @Override protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) { // thread is not idle while handling the request @@ -528,7 +547,8 @@ private Netty4HttpPipeliningHandler getTestHttpHandler() { return new Netty4HttpPipeliningHandler( Integer.MAX_VALUE, mock(Netty4HttpServerTransport.class), - new ThreadWatchdog.ActivityTracker() + new ThreadWatchdog.ActivityTracker(), + HttpHandlingSettings.empty() ) { @Override protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpRequest pipelinedRequest) { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index 00066ffaf0201..f10498530e00d 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -18,7 +18,6 @@ import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.http.HttpBody; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -30,18 +29,16 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { EmbeddedChannel channel; Netty4HttpRequestBodyStream stream; - static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); @Before public void createStream() { channel = new EmbeddedChannel(); stream = new Netty4HttpRequestBodyStream(channel); - stream.setHandler(discardHandler); // set default handler, each test might override one channel.pipeline().addLast(new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { msg.retain(); - stream.handleNettyContent(msg); + stream.onHttpContent(msg); } }); } @@ -52,14 +49,14 @@ public void testEnqueueChunksBeforeRequest() { for (int i = 0; i < totalChunks; i++) { channel.writeInbound(randomContent(1024)); } - assertEquals(totalChunks, stream.chunkQueue().size()); + assertEquals(totalChunks, stream.queueSize()); } // ensures all queued chunks can be flushed downstream public void testFlushQueued() { var chunks = new ArrayList(); var totalBytes = new AtomicInteger(); - stream.setHandler((chunk, isLast) -> { + stream.setConsumingHandler((chunk, isLast) -> { chunks.add(chunk); totalBytes.addAndGet(chunk.length()); }); @@ -82,11 +79,11 @@ public void testFlushQueued() { public void testReadFromChannel() { var gotChunks = new ArrayList(); var gotLast = new AtomicBoolean(false); - stream.setHandler((chunk, isLast) -> { + stream.setConsumingHandler((chunk, isLast) -> { gotChunks.add(chunk); gotLast.set(isLast); }); - channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() + channel.pipeline().addFirst(new FlowControlHandler()); // blocks all incoming messages, need explicit channel.read() var chunkSize = 1024; var totalChunks = randomIntBetween(1, 32); for (int i = 0; i < totalChunks - 1; i++) { @@ -95,7 +92,7 @@ public void testReadFromChannel() { channel.writeInbound(randomLastContent(chunkSize)); for (int i = 0; i < totalChunks; i++) { - assertEquals("should not enqueue chunks", 0, stream.chunkQueue().size()); + assertEquals("should not enqueue chunks", 0, stream.queueSize()); stream.next(); assertEquals("each next() should produce single chunk", i + 1, gotChunks.size()); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpBody.java b/server/src/main/java/org/elasticsearch/http/HttpBody.java index b4d88b837b117..6659704eef3e8 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpBody.java +++ b/server/src/main/java/org/elasticsearch/http/HttpBody.java @@ -11,7 +11,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; /** * A super-interface for different HTTP content implementations @@ -22,6 +22,10 @@ static Full fromBytesReference(BytesReference bytesRef) { return new ByteRefHttpBody(bytesRef); } + static Full fromReleasableBytesReference(ReleasableBytesReference bytesRef) { + return new RelByteRefHttpBody(bytesRef); + } + static Full empty() { return new ByteRefHttpBody(BytesArray.EMPTY); } @@ -38,7 +42,7 @@ default boolean isStream() { * Assumes that HTTP body is a full content. If not sure, use {@link HttpBody#isFull()}. */ default Full asFull() { - assert this instanceof Full; + assert this instanceof Full : "must be full body, got " + this; return (Full) this; } @@ -46,32 +50,44 @@ default Full asFull() { * Assumes that HTTP body is a lazy-stream. If not sure, use {@link HttpBody#isStream()}. */ default Stream asStream() { - assert this instanceof Stream; + assert this instanceof Stream : "must be stream body, got " + this; return (Stream) this; } /** * Full content represents a complete http body content that can be accessed immediately. */ - non-sealed interface Full extends HttpBody { + non-sealed interface Full extends HttpBody, Releasable { BytesReference bytes(); + + @Override + default void close() {} } /** - * Stream is a lazy-loaded content. Stream supports only single handler, this handler must be - * set before requesting next chunk. + * Stream is a lazy-loaded content. */ non-sealed interface Stream extends HttpBody { + /** - * Returns current handler + * Adds tracing handler to stream. Tracing handlers can be used for metering and monitoring. + * Stream will broadcast chunk to all tracing handlers before sending chunk to consuming + * handler. Tracing handler should not invoke {@link Stream#next()}. */ - @Nullable - ChunkHandler handler(); + void addTracingHandler(ChunkHandler chunkHandler); /** - * Sets handler that can handle next chunk + * Sets chunk consuming handler. This handler must be set only once and responsible for + * chunk processing, releasing memory, and calling {@link Stream#next()}. */ - void setHandler(ChunkHandler chunkHandler); + void setConsumingHandler(ChunkHandler chunkHandler); + + /** + * Discards all queued and following chunks. In some cases, for example bad-request, content + * processing is not required. In this case there might be no consuming handler attached to + * stream. This method prevents buffers leaking. + */ + void discard(); /** * Request next chunk of data from the network. The size of the chunk depends on following @@ -82,7 +98,7 @@ non-sealed interface Stream extends HttpBody { * for every chunk. *

          * {@code
-         *     stream.setHandler((chunk, isLast) -> {
+         *     stream.setConsumingHandler((chunk, isLast) -> {
          *         processChunk(chunk);
          *         if (isLast == false) {
          *             stream.next();
@@ -100,4 +116,11 @@ interface ChunkHandler {
     }
 
     record ByteRefHttpBody(BytesReference bytes) implements Full {}
+
+    record RelByteRefHttpBody(ReleasableBytesReference bytes) implements Full {
+        @Override
+        public void close() {
+            bytes.close();
+        }
+    }
 }
diff --git a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java
index 32edae5cca9cb..cbdfa7caae31e 100644
--- a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java
+++ b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java
@@ -30,6 +30,10 @@ public record HttpHandlingSettings(
     boolean detailedErrorsEnabled
 ) {
 
+    public static HttpHandlingSettings empty() {
+        return fromSettings(Settings.EMPTY);
+    }
+
     public static HttpHandlingSettings fromSettings(Settings settings) {
         return new HttpHandlingSettings(
             Math.toIntExact(SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings).getBytes()),
diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java
index cce57d20be23f..5a2156b4b10f9 100644
--- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java
+++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java
@@ -29,6 +29,10 @@ enum HttpVersion {
 
     HttpBody body();
 
+    default void setBody(HttpBody body) {
+        throw new IllegalStateException();
+    };
+
     List strictCookies();
 
     HttpVersion protocolVersion();
diff --git a/server/src/main/java/org/elasticsearch/http/HttpTracer.java b/server/src/main/java/org/elasticsearch/http/HttpTracer.java
index 2f3d376e39086..b0a7c1b37ca16 100644
--- a/server/src/main/java/org/elasticsearch/http/HttpTracer.java
+++ b/server/src/main/java/org/elasticsearch/http/HttpTracer.java
@@ -20,6 +20,7 @@
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.transport.TransportService;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 
@@ -77,10 +78,24 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) {
                 e
             );
             if (isBodyTracerEnabled()) {
-                try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
-                    restRequest.content().writeTo(stream);
-                } catch (Exception e2) {
-                    assert false : e2; // no real IO here
+                var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST);
+                if (restRequest.isFullContent()) {
+                    try (stream) {
+                        restRequest.content().writeTo(stream);
+                    } catch (Exception e2) {
+                        assert false : e2; // no real IO here
+                    }
+                } else {
+                    restRequest.contentStream().addTracingHandler((chunk, isLast) -> {
+                        try {
+                            chunk.writeTo(stream);
+                            if (isLast) {
+                                stream.close();
+                            }
+                        } catch (IOException e2) {
+                            assert false : e2;
+                        }
+                    });
                 }
             }
 
diff --git a/server/src/main/java/org/elasticsearch/http/HttpUtils.java b/server/src/main/java/org/elasticsearch/http/HttpUtils.java
index 12a0a87941357..d93e40e04a589 100644
--- a/server/src/main/java/org/elasticsearch/http/HttpUtils.java
+++ b/server/src/main/java/org/elasticsearch/http/HttpUtils.java
@@ -25,4 +25,22 @@ public static boolean shouldCloseConnection(HttpRequest httpRequest) {
             return true;
         }
     }
+
+    public static int contentLengthHeader(HttpRequest httpRequest) {
+        var cl = httpRequest.getHeaders().get("content-length");
+        if (cl.isEmpty()) {
+            return 0;
+        } else {
+            return Integer.parseInt(cl.get(0));
+        }
+    }
+
+    public static boolean isChunkedTransferEncoding(HttpRequest httpRequest) {
+        var te = httpRequest.getHeaders().get("transfer-encoding");
+        if (te.isEmpty()) {
+            return false;
+        } else {
+            return te.get(0).equals("chunked");
+        }
+    }
 }
diff --git a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java
index d73b679917978..1b0a8b93a3098 100644
--- a/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java
+++ b/server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java
@@ -10,6 +10,7 @@
 
 import org.apache.lucene.search.spell.LevenshteinDistance;
 import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.common.bytes.CompositeBytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -17,12 +18,15 @@
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.Releasables;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.Tuple;
+import org.elasticsearch.http.HttpBody;
 import org.elasticsearch.plugins.ActionPlugin;
 import org.elasticsearch.rest.action.admin.cluster.RestNodesUsageAction;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -32,6 +36,7 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -90,10 +95,30 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl
                 throw new IllegalArgumentException(unrecognized(request, unsupported, allSupported, "parameter"));
             }
         }
+        if (request.isFullContent() || supportsIncrementalContent()) {
+            prepareAndAccept(request, channel, client);
+        } else {
+            aggregateContent(request.contentStream(), (fullContent) -> {
+                request.getHttpRequest().setBody(fullContent);
+                if (allowsUnsafeBuffers() == false) {
+                    request.ensureSafeBuffers();
+                }
+                tryPrepareAndAccept(request, channel, client);
+            });
+        }
+    }
 
-        // prepare the request for execution; has the side effect of touching the request parameters
-        try (var action = prepareRequest(request, client)) {
+    private void tryPrepareAndAccept(RestRequest request, RestChannel channel, NodeClient client) {
+        try {
+            prepareAndAccept(request, channel, client);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 
+    // prepare the request for execution; has the side effect of touching the request parameters
+    private void prepareAndAccept(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
+        try (var action = prepareRequest(request, client)) {
             // validate unconsumed params, but we must exclude params used to format the response
             // use a sorted set so the unconsumed parameters appear in a reliable sorted order
             final SortedSet unconsumedParams = request.unconsumedParams()
@@ -101,7 +126,6 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl
                 .filter(p -> RestResponse.RESPONSE_PARAMS.contains(p) == false)
                 .filter(p -> responseParams(request.getRestApiVersion()).contains(p) == false)
                 .collect(Collectors.toCollection(TreeSet::new));
-
             // validate the non-response params
             if (unconsumedParams.isEmpty() == false) {
                 final Set candidateParams = new HashSet<>();
@@ -109,25 +133,36 @@ public final void handleRequest(RestRequest request, RestChannel channel, NodeCl
                 candidateParams.addAll(responseParams(request.getRestApiVersion()));
                 throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
             }
-
             if (request.hasContent() && (request.isContentConsumed() == false && request.isFullContent())) {
                 throw new IllegalArgumentException(
                     "request [" + request.method() + " " + request.path() + "] does not support having a body"
                 );
             }
-
-            if (request.isStreamedContent()) {
-                assert action instanceof RequestBodyChunkConsumer;
-                var chunkConsumer = (RequestBodyChunkConsumer) action;
-                request.contentStream().setHandler((chunk, isLast) -> chunkConsumer.handleChunk(channel, chunk, isLast));
+            if (action instanceof RequestBodyChunkConsumer chunkConsumer) {
+                assert request.isStreamedContent();
+                request.contentStream().setConsumingHandler((chunk, isLast) -> chunkConsumer.handleChunk(channel, chunk, isLast));
             }
-
             usageCount.increment();
-            // execute the action
             action.accept(channel);
         }
     }
 
+    static void aggregateContent(final HttpBody.Stream stream, final Consumer aggregateConsumer) {
+        var chunks = new ArrayList();
+        stream.setConsumingHandler((chunk, isLast) -> {
+            chunks.add(chunk);
+            if (isLast) {
+                var composite = CompositeBytesReference.of(chunks.toArray(new ReleasableBytesReference[0]));
+                var relComposite = new ReleasableBytesReference(composite, () -> Releasables.close(chunks));
+                var body = HttpBody.fromReleasableBytesReference(relComposite);
+                aggregateConsumer.accept(body);
+            } else {
+                stream.next(); // todo: add backpressure
+            }
+        });
+        stream.next();
+    }
+
     protected static String unrecognized(RestRequest request, Set invalids, Set candidates, String detail) {
         StringBuilder message = new StringBuilder().append("request [")
             .append(request.path())
diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java
index bba1d87f8a38e..0a28c119803cd 100644
--- a/server/src/main/java/org/elasticsearch/rest/RestController.java
+++ b/server/src/main/java/org/elasticsearch/rest/RestController.java
@@ -454,7 +454,7 @@ private void dispatchRequest(
         }
         // TODO: estimate streamed content size for circuit breaker,
         // something like http_max_chunk_size * avg_compression_ratio(for compressed content)
-        final int contentLength = request.isFullContent() ? request.contentLength() : 0;
+        final int contentLength = request.contentLength();
         try {
             if (handler.canTripCircuitBreaker()) {
                 inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "");
@@ -463,10 +463,6 @@ private void dispatchRequest(
             }
             // iff we could reserve bytes for the request we need to send the response also over this channel
             responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength, methodHandlers);
-            // TODO: Count requests double in the circuit breaker if they need copying?
-            if (handler.allowsUnsafeBuffers() == false) {
-                request.ensureSafeBuffers();
-            }
 
             if (handler.allowSystemIndexAccessByDefault() == false) {
                 // The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product and
diff --git a/server/src/main/java/org/elasticsearch/rest/RestHandler.java b/server/src/main/java/org/elasticsearch/rest/RestHandler.java
index 474bf02b4b92b..01501e47f5fa3 100644
--- a/server/src/main/java/org/elasticsearch/rest/RestHandler.java
+++ b/server/src/main/java/org/elasticsearch/rest/RestHandler.java
@@ -47,6 +47,14 @@ default boolean supportsBulkContent() {
         return false;
     }
 
+    /**
+     * Indicates that handler can process content incrementally. Incremental content is a lazy stream
+     * of byte chunks (not related to chunked encoding). Handler should request chunk from stream
+     * when ready to process one. Chunk is a raw bytes slice from network after decompression, size
+     * can vary, and there is no framing.
+     */
+    default boolean supportsIncrementalContent() { return false;}
+
     /**
      * Returns the concrete RestHandler for this RestHandler. That is, if this is a delegating RestHandler it returns the delegate.
      * Otherwise it returns itself.
diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java
index 7d3544e156bdc..361919b98e342 100644
--- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java
+++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java
@@ -25,6 +25,7 @@
 import org.elasticsearch.http.HttpBody;
 import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.http.HttpRequest;
+import org.elasticsearch.http.HttpUtils;
 import org.elasticsearch.telemetry.tracing.Traceable;
 import org.elasticsearch.xcontent.ParsedMediaType;
 import org.elasticsearch.xcontent.ToXContent;
@@ -279,11 +280,21 @@ public final String path() {
     }
 
     public boolean hasContent() {
-        return isStreamedContent() || contentLength() > 0;
+        return (HttpUtils.isChunkedTransferEncoding(httpRequest) && isStreamedContent()) || contentLength() > 0;
     }
 
+    /**
+     * Returns content length. If content is a stream with transfer-encoding=chunked it's not possible
+     * to tell exact length of content before processing all chunks, in this case returns 0.
+     */
     public int contentLength() {
-        return httpRequest.body().asFull().bytes().length();
+        if (isFullContent()) {
+            return httpRequest.body().asFull().bytes().length();
+        } else if (HttpUtils.isChunkedTransferEncoding(httpRequest)) {
+            return 0;
+        } else {
+            return HttpUtils.contentLengthHeader(httpRequest);
+        }
     }
 
     public boolean isFullContent() {