Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -370,31 +370,31 @@ public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public ChunkedRestResponseBody getChunkedBody() {
return getChunkBatch(0);
public ChunkedRestResponseBodyPart getFirstResponseBodyPart() {
return getResponseBodyPart(0);
}

private ChunkedRestResponseBody getChunkBatch(int batchIndex) {
private ChunkedRestResponseBodyPart getResponseBodyPart(int batchIndex) {
if (batchIndex == failIndex && randomBoolean()) {
throw new ElasticsearchException("simulated failure creating next batch");
}
return new ChunkedRestResponseBody() {
return new ChunkedRestResponseBodyPart() {

private final Iterator<String> lines = Iterators.forRange(0, 3, i -> "batch-" + batchIndex + "-chunk-" + i + "\n");

@Override
public boolean isDone() {
public boolean isPartComplete() {
return lines.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return batchIndex == 2;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
executor.execute(ActionRunnable.supply(listener, () -> getChunkBatch(batchIndex + 1)));
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
executor.execute(ActionRunnable.supply(listener, () -> getResponseBodyPart(batchIndex + 1)));
}

@Override
Expand Down Expand Up @@ -486,11 +486,12 @@ public void accept(RestChannel channel) {
@Override
protected void processResponse(Response response) {
try {
final var responseBody = response.getChunkedBody(); // might fail, so do this before acquiring ref
final var responseBody = response.getFirstResponseBodyPart();
// preceding line might fail, so needs to be done before acquiring the sendResponse ref
refs.mustIncRef();
channel.sendResponse(RestResponse.chunked(RestStatus.OK, responseBody, refs::decRef));
} finally {
refs.decRef();
refs.decRef(); // release the ref acquired at the top of accept()
}
}
});
Expand Down Expand Up @@ -534,26 +535,26 @@ public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public ChunkedRestResponseBody getChunkedBody() {
return new ChunkedRestResponseBody() {
public ChunkedRestResponseBodyPart getResponseBodyPart() {
return new ChunkedRestResponseBodyPart() {
private final Iterator<String> lines = Iterators.single("infinite response\n");

@Override
public boolean isDone() {
public boolean isPartComplete() {
return lines.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return false;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
return getChunkedBody();
return getResponseBodyPart();
}));
}

Expand Down Expand Up @@ -628,7 +629,7 @@ public void accept(RestChannel channel) {
client.execute(TYPE, new Request(), new RestActionListener<>(channel) {
@Override
protected void processResponse(Response response) {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getChunkedBody(), () -> {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getResponseBodyPart(), () -> {
// cancellation notification only happens while processing a continuation, not while computing
// the next one; prompt cancellation requires use of something like RestCancellableNodeClient
assertFalse(response.computingContinuation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
Expand Down Expand Up @@ -245,19 +245,19 @@ public BytesReference next() {
private static void sendChunksResponse(RestChannel channel, Iterator<BytesReference> chunkIterator) {
final var localRefs = refs; // single volatile read
if (localRefs != null && localRefs.tryIncRef()) {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() {
@Override
public boolean isDone() {
public boolean isPartComplete() {
return chunkIterator.hasNext() == false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
assert false : "no continuations";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -243,21 +243,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
throw new IllegalArgumentException("[" + FAIL_AFTER_BYTES_PARAM + "] must be present and non-negative");
}
return channel -> randomExecutor(client.threadPool()).execute(
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {
() -> channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBodyPart() {
int bytesRemaining = failAfterBytes;

@Override
public boolean isDone() {
public boolean isPartComplete() {
return false;
}

@Override
public boolean isEndOfResponse() {
public boolean isLastPart() {
return true;
}

@Override
public void getContinuation(ActionListener<ChunkedRestResponseBody> listener) {
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
fail("no continuations here");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@

import io.netty.util.concurrent.PromiseCombiner;

import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;

final class Netty4ChunkedHttpContinuation implements Netty4HttpResponse {
private final int sequence;
private final ChunkedRestResponseBody body;
private final ChunkedRestResponseBodyPart bodyPart;
private final PromiseCombiner combiner;

Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBody body, PromiseCombiner combiner) {
Netty4ChunkedHttpContinuation(int sequence, ChunkedRestResponseBodyPart bodyPart, PromiseCombiner combiner) {
this.sequence = sequence;
this.body = body;
this.bodyPart = bodyPart;
this.combiner = combiner;
}

Expand All @@ -28,8 +28,8 @@ public int getSequence() {
return sequence;
}

public ChunkedRestResponseBody body() {
return body;
public ChunkedRestResponseBodyPart bodyPart() {
return bodyPart;
}

public PromiseCombiner combiner() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestStatus;

/**
Expand All @@ -23,16 +23,16 @@ final class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Net

private final int sequence;

private final ChunkedRestResponseBody body;
private final ChunkedRestResponseBodyPart firstBodyPart;

Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) {
Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
super(version, HttpResponseStatus.valueOf(status.getStatus()));
this.sequence = sequence;
this.body = body;
this.firstBodyPart = firstBodyPart;
}

public ChunkedRestResponseBody body() {
return body;
public ChunkedRestResponseBodyPart firstBodyPart() {
return firstBodyPart;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler;
Expand All @@ -58,7 +58,7 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
private final int maxEventsHeld;
private final PriorityQueue<Tuple<? extends Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;

private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBody responseBody) {}
private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, ChunkedRestResponseBodyPart responseBodyPart) {}

/**
* The current {@link ChunkedWrite} if a chunked write is executed at the moment.
Expand Down Expand Up @@ -214,9 +214,9 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
final ChannelPromise first = ctx.newPromise();
combiner.add((Future<Void>) first);
final var responseBody = readyResponse.body();
final var firstBodyPart = readyResponse.firstBodyPart();
assert currentChunkedWrite == null;
currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
currentChunkedWrite = new ChunkedWrite(combiner, promise, firstBodyPart);
if (enqueueWrite(ctx, readyResponse, first)) {
// We were able to write out the first chunk directly, try writing out subsequent chunks until the channel becomes unwritable.
// NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel.
Expand All @@ -232,9 +232,10 @@ private void doWriteChunkedResponse(ChannelHandlerContext ctx, Netty4ChunkedHttp
private void doWriteChunkedContinuation(ChannelHandlerContext ctx, Netty4ChunkedHttpContinuation continuation, ChannelPromise promise) {
final PromiseCombiner combiner = continuation.combiner();
assert currentChunkedWrite == null;
final var responseBody = continuation.body();
assert responseBody.isDone() == false : "response with continuations must have at least one (possibly-empty) chunk in each part";
currentChunkedWrite = new ChunkedWrite(combiner, promise, responseBody);
final var bodyPart = continuation.bodyPart();
assert bodyPart.isPartComplete() == false
: "response with continuations must have at least one (possibly-empty) chunk in each part";
currentChunkedWrite = new ChunkedWrite(combiner, promise, bodyPart);
// NB "writable" means there's space in the downstream ChannelOutboundBuffer, we aren't trying to saturate the physical channel.
while (ctx.channel().isWritable()) {
if (writeChunk(ctx, currentChunkedWrite)) {
Expand All @@ -251,17 +252,17 @@ private void finishChunkedWrite() {
}
final var finishingWrite = currentChunkedWrite;
currentChunkedWrite = null;
final var finishingWriteBody = finishingWrite.responseBody();
assert finishingWriteBody.isDone();
final var endOfResponse = finishingWriteBody.isEndOfResponse();
final var finishingWriteBodyPart = finishingWrite.responseBodyPart();
assert finishingWriteBodyPart.isPartComplete();
final var endOfResponse = finishingWriteBodyPart.isLastPart();
if (endOfResponse) {
writeSequence++;
finishingWrite.combiner().finish(finishingWrite.onDone());
} else {
final var channel = finishingWrite.onDone().channel();
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBody continuation) {
public void onResponse(ChunkedRestResponseBodyPart continuation) {
channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
Expand Down Expand Up @@ -296,7 +297,7 @@ private void checkShutdown() {
}
}

}), finishingWriteBody::getContinuation);
}), finishingWriteBodyPart::getNextPart);
}
}

Expand Down Expand Up @@ -374,22 +375,22 @@ private boolean doFlush(ChannelHandlerContext ctx) throws IOException {
}

private boolean writeChunk(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite) {
final var body = chunkedWrite.responseBody();
final var bodyPart = chunkedWrite.responseBodyPart();
final var combiner = chunkedWrite.combiner();
assert body.isDone() == false : "should not continue to try and serialize once done";
assert bodyPart.isPartComplete() == false : "should not continue to try and serialize once done";
final ReleasableBytesReference bytes;
try {
bytes = body.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler());
bytes = bodyPart.encodeChunk(Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE, serverTransport.recycler());
} catch (Exception e) {
return handleChunkingFailure(ctx, chunkedWrite, e);
}
final ByteBuf content = Netty4Utils.toByteBuf(bytes);
final boolean done = body.isDone();
final boolean lastChunk = done && body.isEndOfResponse();
final ChannelFuture f = ctx.write(lastChunk ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
final boolean isPartComplete = bodyPart.isPartComplete();
final boolean isBodyComplete = isPartComplete && bodyPart.isLastPart();
final ChannelFuture f = ctx.write(isBodyComplete ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
f.addListener(ignored -> bytes.close());
combiner.add(f);
return done;
return isPartComplete;
}

private boolean handleChunkingFailure(ChannelHandlerContext ctx, ChunkedWrite chunkedWrite, Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand Down Expand Up @@ -176,8 +176,8 @@ public Netty4FullHttpResponse createResponse(RestStatus status, BytesReference c
}

@Override
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, content);
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBodyPart firstBodyPart) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, firstBodyPart);
}

@Override
Expand Down
Loading