Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpContent;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.function.Predicate;
import java.util.function.Supplier;

public class Netty4RequestContentPublisherIT extends ESNetty4IntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.concatLists(List.of(RequestContentStreamPlugin.class), super.nodePlugins());
}

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

public void testBasicStream() throws IOException {
var totalBytes = 1024 * 1024;
var request = new Request("POST", RequestContentStreamPlugin.ROUTE);
request.setEntity(new ByteArrayEntity(randomByteArrayOfLength(totalBytes), ContentType.APPLICATION_JSON));

var respose = getRestClient().performRequest(request);
assertEquals(200, respose.getStatusLine().getStatusCode());
var gotTotalBytes = new BytesArray(respose.getEntity().getContent().readAllBytes()).utf8ToString();
assertEquals("" + totalBytes, gotTotalBytes);
}

public static class RequestContentStreamPlugin extends Plugin implements ActionPlugin {

static final String ROUTE = "/_test/request-stream/basic";
private static final Logger LOGGER = LogManager.getLogger("StreamRestHandler");

@Override
public Collection<RestHandler> getRestHandlers(
Settings settings,
NamedWriteableRegistry namedWriteableRegistry,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster,
Predicate<NodeFeature> clusterSupportsFeature
) {
return List.of(new BaseRestHandler() {
@Override
public String getName() {
return ROUTE;
}

@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.POST, ROUTE));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return new RestChannelConsumer() {
@Override
public void accept(RestChannel channel) {

// netty channel will hold all chunks until we subscribe
request.contentPublisher().subscribe(new Flow.Subscriber<>() {
Flow.Subscription subscription;
int totalReceivedBytes;

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// after subscription, we can request N next messages, for example 5 chunks
subscription.request(5);
}

@Override
public void onNext(HttpContent item) {
// chunk handler
var contentSize = item.content().length();
LOGGER.info("got next item of a size {}", contentSize);
totalReceivedBytes += contentSize;
item.release();
// we need explicitly ask for next N chunks
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
// not implemented yet
assert false : throwable.getMessage();
}

@Override
public void onComplete() {
// completion event after LastHttpContent
LOGGER.info("complete");
channel.sendResponse(new RestResponse(RestStatus.OK, Integer.toString(totalReceivedBytes)));
}
});

}
};
}
});
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;

import java.util.List;

/**
* A wrapper around {@link HttpContentDecompressor} that consumes and produces {@link PipelinedHttpRequest} and
* {@link PipelinedHttpContent}.
*/
public class Netty4ContentDecompressor extends HttpContentDecompressor {

@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return msg instanceof PipelinedHttpObject;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education: we don't need call super.acceptInboundMessage here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapped Decompressor should call this method on every "read(ctx, msg)", I added this check to avoid surprises in "decode" method that assumes message has sequence number.

}

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
super.decode(ctx, msg, out);
final var sequence = ((PipelinedHttpObject) msg).sequence();
out.replaceAll(obj -> {
if (obj instanceof PipelinedHttpObject) {
return obj;
} else if (obj instanceof FullHttpRequest request) {
return new PipelinedFullHttpRequest(request, sequence);
} else if (obj instanceof HttpRequest request) {
return new PipelinedHttpRequest(request, sequence);
} else if (obj instanceof LastHttpContent lastContent) {
return new PipelinedLastHttpContent(lastContent, sequence);
} else if (obj instanceof HttpContent content) {
return new PipelinedHttpContent(content, sequence);
} else {
throw new IllegalArgumentException();
}
});
Comment on lines +36 to +50
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be more efficient to have a delegating List that does these wrapping in its add method and pass it to super.decode(...). Maybe a pre-mature optimisation for the early stage.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;

import java.util.List;
import java.util.function.Predicate;

/**
* A wrapper around {@link HttpObjectAggregator}, provides optional aggregation for {@link PipelinedHttpObject}'s.
* A {@code decider} predicate selects HTTP requests that will be aggregated into {@link PipelinedFullHttpRequest}.
*/
public class Netty4HttpAggregator extends HttpObjectAggregator {

private static final Predicate<PipelinedHttpRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added to the source for the sake of PR size


private final Predicate<PipelinedHttpRequest> decider;
private boolean shouldAggregate;

public Netty4HttpAggregator(int maxContentLength) {
this(maxContentLength, IGNORE_TEST);
}

public Netty4HttpAggregator(int maxContentLength, Predicate<PipelinedHttpRequest> decider) {
super(maxContentLength);
this.decider = decider;
}

@Override
public boolean acceptInboundMessage(Object msg) {
if (msg instanceof PipelinedHttpRequest request) {
shouldAggregate = decider.test(request);
return shouldAggregate;
} else if (msg instanceof PipelinedHttpContent || msg instanceof PipelinedLastHttpContent) {
return shouldAggregate;
} else {
return false;
}
}

@Override
protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
super.decode(ctx, msg, out);
out.replaceAll(o -> new PipelinedFullHttpRequest((FullHttpRequest) o, ((PipelinedHttpObject) msg).sequence()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpContent;
import org.elasticsearch.transport.netty4.Netty4Utils;

public class Netty4HttpContent implements HttpContent {

private final io.netty.handler.codec.http.HttpContent nettyContent;
private final BytesReference ref;

Netty4HttpContent(io.netty.handler.codec.http.HttpContent httpContent) {
nettyContent = httpContent;
ref = Netty4Utils.toBytesReference(httpContent.content());
}

@Override
public BytesReference content() {
return ref;
}

@Override
public void release() {
nettyContent.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.util.ReferenceCountUtil;
Expand Down Expand Up @@ -70,12 +72,13 @@ private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Chu
@Nullable
private ChunkedWrite currentChunkedWrite;

/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
* current write sequence, implying that all preceding messages have been written.
private Netty4HttpRequest currentRequest;

/**
* Read sequence numbers are attached to requests by {@link Netty4InboundHttpPipeliningHandler} and then transferred to responses.
* A response is not written to the channel context until its sequence number matches the current write sequence, implying that all
* preceding messages have been written.
*/
private int readSequence;
private int writeSequence;

/**
Expand Down Expand Up @@ -109,23 +112,34 @@ public Netty4HttpPipeliningHandler(
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
activityTracker.startActivity();
try {
assert msg instanceof FullHttpRequest : "Should have fully aggregated message already but saw [" + msg + "]";
final FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
final Netty4HttpRequest netty4HttpRequest;
if (fullHttpRequest.decoderResult().isFailure()) {
final Throwable cause = fullHttpRequest.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
assert msg instanceof PipelinedHttpObject : "Should have pipelined message already but saw [" + msg + "]";
final var sequence = ((PipelinedHttpObject) msg).sequence();
if (msg instanceof HttpRequest request) {
final Netty4HttpRequest netty4HttpRequest;
if (request.decoderResult().isFailure()) {
final Throwable cause = request.decoderResult().cause();
final Exception nonError;
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
nonError = new Exception(cause);
} else {
nonError = (Exception) cause;
}
netty4HttpRequest = new Netty4HttpRequest(sequence, (FullHttpRequest) request, nonError);
} else {
nonError = (Exception) cause;
if (request instanceof FullHttpRequest fullRequest) {
netty4HttpRequest = new Netty4HttpRequest(sequence, fullRequest);
} else {
var contentPublisher = new Netty4RequestContentPublisher(ctx.channel());
netty4HttpRequest = new Netty4HttpRequest(sequence, request, contentPublisher);
currentRequest = netty4HttpRequest;
}
}
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest, nonError);
handlePipelinedRequest(ctx, netty4HttpRequest);
} else {
netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest);
assert currentRequest != null;
currentRequest.contentPublisher().sendChunk((HttpContent) msg);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I attach currentRequest to the Netty4PipeliningHandler and consequent chunks can be published right a way to the RestHandler from here.

}
handlePipelinedRequest(ctx, netty4HttpRequest);
} finally {
activityTracker.stopActivity();
}
Expand Down
Loading