Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void onFailure(Exception exception) {
safeSleep(scaledRandomIntBetween(10, 500)); // make it more likely the request started executing
}
cancellable.cancel();
} // closing the request tracker ensures that everything is released, including all response chunks and the overall response
} // closing the resource tracker ensures that everything is released, including all response chunks and the overall response
}

private static Releasable withResourceTracker() {
Expand Down Expand Up @@ -525,6 +525,7 @@ public ActionRequestValidationException validate() {
public static class Response extends ActionResponse {
private final Executor executor;
volatile boolean computingContinuation;
boolean recursive = false;

public Response(Executor executor) {
this.executor = executor;
Expand All @@ -551,11 +552,17 @@ public boolean isLastPart() {

@Override
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
return getResponseBodyPart();
}));
assertFalse(recursive);
recursive = true;
try {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
return getResponseBodyPart();
}));
} finally {
recursive = false;
}
}

@Override
Expand Down Expand Up @@ -585,7 +592,10 @@ public TransportInfiniteContinuationsAction(ActionFilters actionFilters, Transpo
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
executor.execute(
ActionRunnable.supply(ActionTestUtils.assertNoFailureListener(listener::onResponse), () -> new Response(executor))
ActionRunnable.supply(
ActionTestUtils.assertNoFailureListener(listener::onResponse),
() -> new Response(randomFrom(executor, EsExecutors.DIRECT_EXECUTOR_SERVICE))
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.network.ThreadWatchdog;
Expand Down Expand Up @@ -271,45 +272,59 @@ private void finishChunkedWrite() {
writeSequence++;
finishingWrite.combiner().finish(finishingWrite.onDone());
} else {
final var threadContext = serverTransport.getThreadPool().getThreadContext();
assert Transports.assertDefaultThreadContext(threadContext);
final var channel = finishingWrite.onDone().channel();
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBodyPart continuation) {
channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
);
checkShutdown();
}

@Override
public void onFailure(Exception e) {
logger.error(
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
}

private void checkShutdown() {
if (channel.eventLoop().isShuttingDown()) {
// The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the
// preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so
// to avoid a leak we must double-check that the final listener is completed once the event loop is terminated.
// Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor
// which means this completion is not subject to the same issue, it still works even if the event loop has already
// terminated.
channel.eventLoop()
.terminationFuture()
.addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
}
}

}), finishingWriteBodyPart::getNextPart);
ActionListener.run(
new ContextPreservingActionListener<>(
threadContext.newRestorableContext(false),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the response has multiple continuations, it seems to me that we are technically restoring the threadContext from the previous iteration instead of the threadContext when we send the first part? But maybe it is effectively the same since (1) the capture and restore are chained and (2) there is nothing in writeAndFlush could change the threadContext?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but we're in the Netty event loop here so we are indeed not changing the thread context (which is why we call assertDefaultThreadContext to check that)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertDefaultThreadContext does not check response header. But I see what you mean.

ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBodyPart continuation) {
// always fork a fresh task to avoid stack overflow
assert Transports.assertDefaultThreadContext(threadContext);
channel.eventLoop()
.execute(
() -> channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
)
);
checkShutdown();
}

@Override
public void onFailure(Exception e) {
assert Transports.assertDefaultThreadContext(threadContext);
logger.error(
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
}

private void checkShutdown() {
if (channel.eventLoop().isShuttingDown()) {
// The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know
// if the preceding activity made it onto its queue before shutdown or whether it will just vanish without a
// trace, so to avoid a leak we must double-check that the final listener is completed once the event loop
// is terminated. Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an
// ImmediateEventExecutor which means this completion is not subject to the same issue, it still works even
// if the event loop has already terminated.
channel.eventLoop()
.terminationFuture()
.addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
}
}

})
),
finishingWriteBodyPart::getNextPart
);
}
}

Expand Down