Add support for partial http requests handling with pipelining#111258
Add support for partial http requests handling with pipelining#111258mhl-b wants to merge 2 commits intoelastic:partial-rest-requestsfrom
Conversation
|
Pinging @elastic/es-distributed (Team:Distributed) |
There was a problem hiding this comment.
Thanks Mikhail. I'll take a more detailed look in due course but two things straight away:
-
Could we have a test case which shows how a REST handler would use this API? Probably a
ESNetty4IntegTestCasederivative would be best, see e.g.Netty4ChunkedContinuationsITfor an example of the sort of thing I mean. -
Could we merge this into a feature branch for now rather than
main? That way we've somewhere to coordinate the work to adopt this in the bulk indexing path, but we're not committing to it until we have the whole solution in place and can run some benchmarks?
|
ywangd
left a comment
There was a problem hiding this comment.
This seems to be an exciting piece of work! Thanks for working on it. I had a first read-through and left some minor comments. I plan to come back to it and have a closer look. In the meantime, I second on David comment about having a test case that demonstrates how this new feature is leveraged.
| out.replaceAll(obj -> { | ||
| if (obj instanceof PipelinedHttpObject) { | ||
| return obj; | ||
| } else if (obj instanceof FullHttpRequest request) { | ||
| return new PipelinedFullHttpRequest(request, sequence); | ||
| } else if (obj instanceof HttpRequest request) { | ||
| return new PipelinedHttpRequest(request, sequence); | ||
| } else if (obj instanceof LastHttpContent lastContent) { | ||
| return new PipelinedLastHttpContent(lastContent, sequence); | ||
| } else if (obj instanceof HttpContent content) { | ||
| return new PipelinedHttpContent(content, sequence); | ||
| } else { | ||
| throw new IllegalArgumentException(); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Might be more efficient to have a delegating List that does these wrapping in its add method and pass it to super.decode(...). Maybe a pre-mature optimisation for the early stage.
|
|
||
| @Override | ||
| public boolean acceptInboundMessage(Object msg) throws Exception { | ||
| return msg instanceof PipelinedHttpObject; |
There was a problem hiding this comment.
For my education: we don't need call super.acceptInboundMessage here?
There was a problem hiding this comment.
Wrapped Decompressor should call this method on every "read(ctx, msg)", I added this check to avoid surprises in "decode" method that assumes message has sequence number.
|
@DaveCTurner @ywangd |
| */ | ||
| public class Netty4HttpAggregator extends HttpObjectAggregator { | ||
|
|
||
| private static final Predicate<PipelinedHttpRequest> IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false; |
There was a problem hiding this comment.
added to the source for the sake of PR size
| } else { | ||
| netty4HttpRequest = new Netty4HttpRequest(readSequence++, fullHttpRequest); | ||
| assert currentRequest != null; | ||
| currentRequest.contentPublisher().sendChunk((HttpContent) msg); |
There was a problem hiding this comment.
I attach currentRequest to the Netty4PipeliningHandler and consequent chunks can be published right a way to the RestHandler from here.
| Netty4HttpRequest(int sequence, HttpRequest request, Netty4RequestContentPublisher contentPublisher) { | ||
| this.sequence = sequence; | ||
| this.nettyRequest = request; | ||
| this.nettyContent = new DefaultHttpContent(Unpooled.EMPTY_BUFFER); | ||
| this.contentPublisher = contentPublisher; | ||
| this.content = BytesArray.EMPTY; | ||
| this.released = new AtomicBoolean(false); | ||
| this.pooled = false; | ||
| this.headers = getHttpHeadersAsMap(request.headers()); | ||
| this.inboundException = null; | ||
| } |
There was a problem hiding this comment.
Add support to Netty4HttpRequest to be HttpRequest and optional HttpContent, rather than always FullHttpRequest. I do drop trailing headers for not full requests, not sure do we use them or not. Haven't found traces of them yet.
|
|
||
| import java.util.concurrent.Flow; | ||
|
|
||
| public class Netty4RequestContentPublisher implements Flow.Publisher<HttpContent> { |
There was a problem hiding this comment.
bare bones, nothing fancy, no error handling, no state management, no multiple subscriptions, just happy path
DaveCTurner
left a comment
There was a problem hiding this comment.
A reactive stream is the right conceptual model for sure but I'm not convinced we want to stick to these exact APIs. There's always exactly one subscriber, known ahead of time, so there's no need for a separate subscribe() API; we really want .request() to request a certain number of bytes but will typically deliver this as a single chunk, which differs from the reactive streams API that expects the argument to be the number of chunks; finally the Throwable in the onError path is very concerning. Remember this code is all on an incredibly hot path, we need to avoid unnecessary abstractions wherever possible.
|
@DaveCTurner @ywangd In new PR no changes in pipelining sequencing, no more PipelinedObjects, simplified wrapper for HttpObjectAggregator that does not intercept "decode" method, simplified interface for streamed content, I still use reactive streams as a model but only with minimal required methods. |
|
Would it be OK to close this PR if this is not intended to be continued? It would help to tell which one to follow more easily. |
UPDATE:
Created a leaner and cleaner version here #111438.
Most of changes here are irrelevant in new PR.
This PR adds support to
Netty4HttpPipeliningHandlerto handle parts of HTTP request. Right now we always aggregate HTTP request content before passing toRestController. With this changeNetty4HttpPipeliningHandlercan receiveFullHttpRequestsandHttpRequests/HttpContentswith correct sequence number.To make it possible I made several changes in Netty channel pipeline.
Add HTTP pipeline sequence to parts of HTTP request. Before
Netty4HttpPipeliningHandlerwould keep track of all incomingFullHttpRequestsand increase sequence number. With this change addition of sequence number happens sooner, afterHttpRequestDecoder, but before decompression and aggregation. It happens inNetty4InboundHttpPipeliningHandler.Propagation of sequence number through
HttpObjectAggregatorandHttpContentDecompressor. Since netty does not know about our pipelining implementation, I have to wrap both classes that can consume and produce a "pipelined" versions ofHttpObjects. Besides wrapper handler classes I introduced strong-typed versions ofHttpRequest, HttpContent, LastHttpContent, FullHttpRequestwith pipeline sequence number, for examplepublic record PipelinedHttpContent(HttpContent httpContent, int sequence)Make HTTP aggregation optional. In this PR we still do full aggregation to all requests. But I added a predicate that can control which requests can skip aggregation, and unit tests.
After change pipeline looks like this:
Most interesting changes in
Netty4ContentDecompressor, Netty4HttpAggregator, Netty4InboundHttpPipeliningHandler , Netty4HttpPipeliningHandlerand related unit tests at the end. The rest is boilerplate.Update:
Added example of RestHandler with chunked content. I use reactive streams model. Rather than exposing
BytesReferenceas content, I useFlow.Publisher<HttpContent>. The RestHandler will subscribe to publisher and consume parts at it's own rate - bySubscription.request(long n). The publisher will read from netty channel on demand and invokeonNext(HttpContent)when number of requested parts is greater than 0.Content publisher -
Netty4RequestContentPublisherRest handler example -
Netty4RequestContentPublisherITI omitted error handling and edge cases for brevity. There many things to consider with publisher state machine: for example avoid multiple subscriptions, handling terminal states, handling errors, handling subscriber cancellation. I did some drafts, and I can say it's pretty easy to add them.