Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;

import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;

import java.util.function.Predicate;

public class Netty4HttpAggregator extends HttpObjectAggregator {
Comment thread
mhl-b marked this conversation as resolved.
private static final Predicate<HttpPreRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;

private final Predicate<HttpPreRequest> decider;
private boolean shouldAggregate;

public Netty4HttpAggregator(int maxContentLength) {
this(maxContentLength, IGNORE_TEST);
}

public Netty4HttpAggregator(int maxContentLength, Predicate<HttpPreRequest> decider) {
super(maxContentLength);
this.decider = decider;
}

@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
shouldAggregate = decider.test(preReq);
}
if (shouldAggregate) {
return super.acceptInboundMessage(msg);
} else {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -70,6 +73,12 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu
@Nullable
private ChunkedWrite currentChunkedWrite;

/**
* HTTP request content stream for current request, it's null if there is no current request or request is fully-aggregated
*/
@Nullable
private Netty4HttpRequestBodyStream currentRequestStream;

/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
Expand Down Expand Up @@ -109,23 +118,38 @@ public Netty4HttpPipeliningHandler(
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
activityTracker.startActivity();
try {
assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
final Netty4HttpRequest netty4HttpRequest;
if (fullHttpRequest.decoderResult().isFailure()) {
final Throwable cause = fullHttpRequest.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
if (msg instanceof HttpRequest request) {
final Netty4HttpRequest netty4HttpRequest;
if (request.decoderResult().isFailure()) {
final Throwable cause = request.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
} else {
nonError = (Exception) cause;
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, (FullHttpRequest) request, nonError);
} else {
nonError = (Exception) cause;
assert currentRequestStream == null : "current stream must be null for new request";
if (request instanceof FullHttpRequest fullHttpRequest) {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
currentRequestStream = null;
} else {
var contentStream = new Netty4HttpRequestBodyStream(ctx.channel());
currentRequestStream = contentStream;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, can we assert currentRequestStream is null here or it can be non-null?

Copy link
Copy Markdown
Contributor Author

@mhl-b mhl-b Aug 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be non-null if previous request was stream too. When we see HttpRequest that means we received all parts of previous request - all HttpContent's and single LastHttpContent. At this point previous parts should be either in previous stream queue or processed by rest handler.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be non-null if previous request was stream too

We set currentRequestStream back to null on receiving LastHttpContent. Or do you mean that may not necessary happen for every streaming request?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, sorry I lost track of my own changes :) In normal circumstances should be null. If request is not properly terminated (no last content) and there is new request then currentRequestStream might be not null, but it will end up with decoding failure and connection shutdown. I will add test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ to a test, but could we also assert currentRequestStream == null here? It would be useful documentation if nothing else.

netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
}
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
handlePipelinedRequest(ctx, netty4HttpRequest);
} else {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
assert msg instanceof HttpContent : "expect HttpContent got " + msg;
assert currentRequestStream != null : "current stream must exists before handling http content";
currentRequestStream.handleNettyContent((HttpContent) msg);
if (msg instanceof LastHttpContent) {
currentRequestStream = null;
}
}
handlePipelinedRequest(ctx, netty4HttpRequest);
} finally {
activityTracker.stopActivity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
Expand All @@ -20,6 +21,7 @@
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
Expand All @@ -39,22 +41,40 @@
public class Netty4HttpRequest implements HttpRequest {

private final FullHttpRequest request;
private final BytesReference content;
private final HttpBody content;
private final Map<String, List<String>> headers;
private final AtomicBoolean released;
private final Exception inboundException;
private final boolean pooled;
private final int sequence;

Netty4HttpRequest(int sequence, io.netty.handler.codec.http.HttpRequest request, Netty4HttpRequestBodyStream contentStream) {
this(
sequence,
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.EMPTY_BUFFER,
request.headers(),
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
false,
contentStream,
null
);
}

Netty4HttpRequest(int sequence, FullHttpRequest request) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()));
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()));
}

Netty4HttpRequest(int sequence, FullHttpRequest request, Exception inboundException) {
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.toBytesReference(request.content()), inboundException);
this(sequence, request, new AtomicBoolean(false), true, Netty4Utils.fullHttpBodyFrom(request.content()), inboundException);
}

private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, BytesReference content) {
private Netty4HttpRequest(int sequence, FullHttpRequest request, AtomicBoolean released, boolean pooled, HttpBody content) {
this(sequence, request, released, pooled, content, null);
}

Expand All @@ -63,7 +83,7 @@ private Netty4HttpRequest(
FullHttpRequest request,
AtomicBoolean released,
boolean pooled,
BytesReference content,
HttpBody content,
Exception inboundException
) {
this.sequence = sequence;
Expand All @@ -86,7 +106,7 @@ public String uri() {
}

@Override
public BytesReference content() {
public HttpBody body() {
assert released.get() == false;
return content;
}
Expand Down Expand Up @@ -118,7 +138,7 @@ public HttpRequest releaseAndCopy() {
),
new AtomicBoolean(false),
false,
Netty4Utils.toBytesReference(copiedContent)
content
);
} finally {
release();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;

import org.elasticsearch.http.HttpBody;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.ArrayDeque;
import java.util.Queue;

/**
* Netty based implementation of {@link HttpBody.Stream}.
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
* to prevent entire payload buffering. But sometimes upstream can send few chunks of data despite
* autoRead=off. In this case chunks will be queued until downstream calls {@link Stream#next()}
*/
public class Netty4HttpRequestBodyStream implements HttpBody.Stream {

private final Channel channel;
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
private boolean requested = false;
private boolean hasLast = false;
private HttpBody.ChunkHandler handler;

public Netty4HttpRequestBodyStream(Channel channel) {
this.channel = channel;
channel.closeFuture().addListener((f) -> releaseQueuedChunks());
channel.config().setAutoRead(false);
}

@Override
public ChunkHandler handler() {
return handler;
}

@Override
public void setHandler(ChunkHandler chunkHandler) {
this.handler = chunkHandler;
}

private void sendQueuedOrRead() {
assert channel.eventLoop().inEventLoop();
requested = true;
var chunk = chunkQueue.poll();
if (chunk == null) {
channel.read();
} else {
sendChunk(chunk);
}
}

@Override
public void next() {
assert handler != null : "handler must be set before requesting next chunk";
if (channel.eventLoop().inEventLoop()) {
sendQueuedOrRead();
} else {
channel.eventLoop().submit(this::sendQueuedOrRead);
}
}

public void handleNettyContent(HttpContent httpContent) {
assert handler != null : "handler must be set before processing http content";
if (requested && chunkQueue.isEmpty()) {
sendChunk(httpContent);
} else {
chunkQueue.add(httpContent);
}
if (httpContent instanceof LastHttpContent) {
hasLast = true;
channel.config().setAutoRead(true);
}
}

// visible for test
Channel channel() {
return channel;
}

// visible for test
Queue<HttpContent> chunkQueue() {
return chunkQueue;
}

// visible for test
boolean hasLast() {
return hasLast;
}

private void sendChunk(HttpContent httpContent) {
assert requested;
requested = false;
var bytesRef = Netty4Utils.toReleasableBytesReference(httpContent.content());
var isLast = httpContent instanceof LastHttpContent;
handler.onNext(bytesRef, isLast);
}

private void releaseQueuedChunks() {
while (chunkQueue.isEmpty() == false) {
chunkQueue.poll().release();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
);
}
// combines the HTTP message pieces into a single full HTTP request (with headers and body)
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.maxContentLength());
final HttpObjectAggregator aggregator = new Netty4HttpAggregator(handlingSettings.maxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline()
.addLast("decoder_compress", new HttpContentDecompressor()) // this handles request body decompression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.transport.TransportException;

import java.io.IOException;
Expand Down Expand Up @@ -123,6 +125,14 @@ public static BytesReference toBytesReference(final ByteBuf buffer) {
}
}

public static ReleasableBytesReference toReleasableBytesReference(final ByteBuf buffer) {
return new ReleasableBytesReference(toBytesReference(buffer), buffer::release);
}

public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) {
return new HttpBody.ByteRefHttpBody(toBytesReference(buf));
}

public static Recycler<BytesRef> createRecycler(Settings settings) {
// If this method is called by super ctor the processors will not be set. Accessing NettyAllocator initializes netty's internals
// setting the processors. We must do it ourselves first just in case.
Expand Down
Loading