Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,9 @@ tests:
- class: org.elasticsearch.reindex.management.ReindexManagementClientYamlTestSuiteIT
method: test {yaml=reindex/30_cancel_reindex/Cancel running reindex returns response and GET confirms completed}
issue: https://github.com/elastic/elasticsearch/issues/142079
- class: org.elasticsearch.compute.lucene.query.LuceneTopNSourceOperatorScoringTests
method: testAccumulateSearchLoad
issue: https://github.com/elastic/elasticsearch/issues/142986
- class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/143023
- class: org.elasticsearch.compute.lucene.query.LuceneTopNSourceOperatorTests
method: testAccumulateSearchLoad
issue: https://github.com/elastic/elasticsearch/issues/143111
- class: org.elasticsearch.xpack.sql.qa.security.CliApiKeyIT
method: testCliConnectionWithApiKey
issue: https://github.com/elastic/elasticsearch/issues/143125
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ public DriverContext driverContext() {
* Returns a blocked future when the chain of operators is blocked, allowing the caller
* thread to do other work instead of blocking or busy-spinning on the blocked operator.
*/
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) {
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running");
SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplier currentTimeNanosSupplier) {
long maxTimeNanos = maxTime.nanos();
// Start time, used to stop the calculations after maxTime has passed.
long startTime = nowSupplier.getAsLong();
long startTime = currentTimeNanosSupplier.getAsLong();
updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running", startTime);
// The time of the next forced status update.
long nextStatus = startTime + statusNanos;
// Total executed iterations this run, used to stop the calculations after maxIterations have passed.
Expand All @@ -191,9 +191,13 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
try {
assert driverContext.assertBeginRunLoop();
isBlocked = runSingleLoopIteration(nowSupplier, lastStatusUpdateTime);
isBlocked = runSingleLoopIteration(currentTimeNanosSupplier, lastStatusUpdateTime);
} catch (DriverEarlyTerminationException unused) {
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdateTime);
closeEarlyFinishedOperators(
activeOperators.listIterator(activeOperators.size()),
currentTimeNanosSupplier,
lastStatusUpdateTime
);
assert isFinished() : "not finished after early termination";
} catch (TaskCancelledException e) {
LOGGER.debug("Cancelling running driver [{}]", shortDescription, e);
Expand All @@ -216,28 +220,52 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
totalIterationsThisRun++;
iterationsSinceLastStatusUpdate++;

long now = nowSupplier.getAsLong();
long now = currentTimeNanosSupplier.getAsLong();
if (isBlocked.listener().isDone() == false) {
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason());
updateStatus(
now - lastStatusUpdateTime,
iterationsSinceLastStatusUpdate,
DriverStatus.Status.ASYNC,
isBlocked.reason(),
now
);
return isBlocked.listener();
}
if (isFinished()) {
finishNanos = now;
updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done");
updateStatus(
finishNanos - lastStatusUpdateTime,
iterationsSinceLastStatusUpdate,
DriverStatus.Status.DONE,
"driver done",
now
);
driverContext.finish();
Releasables.close(releasable, driverContext.getSnapshot());
return Operator.NOT_BLOCKED.listener();
}
if (totalIterationsThisRun >= maxIterations) {
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations");
updateStatus(
now - lastStatusUpdateTime,
iterationsSinceLastStatusUpdate,
DriverStatus.Status.WAITING,
"driver iterations",
now
);
return Operator.NOT_BLOCKED.listener();
}
if (now - startTime >= maxTimeNanos) {
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time");
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time", now);
return Operator.NOT_BLOCKED.listener();
}
if (now > nextStatus) {
updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running");
updateStatus(
now - lastStatusUpdateTime,
iterationsSinceLastStatusUpdate,
DriverStatus.Status.RUNNING,
"driver running",
now
);
iterationsSinceLastStatusUpdate = 0;
lastStatusUpdateTime = now;
nextStatus = now + statusNanos;
Expand Down Expand Up @@ -271,7 +299,7 @@ public void abort(Exception reason, ActionListener<Void> listener) {
}
}

private IsBlockedResult runSingleLoopIteration(LongSupplier nowSupplier, long lastStatusUpdate) {
private IsBlockedResult runSingleLoopIteration(LongSupplier currentTimeNanosSupplier, long lastStatusUpdate) {
driverContext.checkForEarlyTermination();
boolean movedPage = false;

Expand Down Expand Up @@ -314,14 +342,14 @@ private IsBlockedResult runSingleLoopIteration(LongSupplier nowSupplier, long la
if (op.isFinished()) {
driverContext.checkForEarlyTermination();
var originalIndex = iterator.previousIndex();
var index = closeEarlyFinishedOperators(iterator, nowSupplier, lastStatusUpdate);
var index = closeEarlyFinishedOperators(iterator, currentTimeNanosSupplier, lastStatusUpdate);
if (index >= 0) {
iterator = new ArrayList<>(activeOperators).listIterator(originalIndex - index);
}
}
}

closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdate);
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), currentTimeNanosSupplier, lastStatusUpdate);

if (movedPage == false) {
blockedResults.clear();
Expand All @@ -342,7 +370,11 @@ protected void onNoPagesMoved() {
}

// Returns the index of the last operator that was closed, -1 if no operator was closed.
protected int closeEarlyFinishedOperators(ListIterator<Operator> operators, LongSupplier nowSupplier, long lastStatusUpdate) {
protected int closeEarlyFinishedOperators(
ListIterator<Operator> operators,
LongSupplier currentTimeNanosSupplier,
long lastStatusUpdate
) {
var iterator = activeOperators.listIterator(operators.nextIndex());
while (iterator.hasPrevious()) {
if (iterator.previous().isFinished()) {
Expand All @@ -358,7 +390,7 @@ protected int closeEarlyFinishedOperators(ListIterator<Operator> operators, Long
Operator op = finishedOperators.next();
statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status()));
if (op instanceof SourceOperator sourceOperator) {
long now = nowSupplier.getAsLong();
long now = currentTimeNanosSupplier.getAsLong();
// report one last time before closing
sourceOperator.reportSearchLoad(now - lastStatusUpdate, now);
}
Expand Down Expand Up @@ -402,9 +434,18 @@ public static void start(
) {
driver.completionListener.addListener(listener);
if (driver.started.compareAndSet(false, true)) {
driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting");
LongSupplier currentTimeNanosSupplier = System::nanoTime;
driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting", currentTimeNanosSupplier.getAsLong());
initializeEarlyTerminationChecker(driver);
schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener);
schedule(
DEFAULT_TIME_BEFORE_YIELDING,
maxIterations,
threadContext,
executor,
driver,
driver.completionListener,
currentTimeNanosSupplier
);
}
}

Expand Down Expand Up @@ -455,22 +496,23 @@ private static void schedule(
ThreadContext threadContext,
Executor executor,
Driver driver,
ActionListener<Void> listener
ActionListener<Void> listener,
LongSupplier currentTimeNanosSupplier
) {
final var task = new AbstractRunnable() {

@Override
protected void doRun() {
SubscribableListener<Void> fut = driver.run(maxTime, maxIterations, System::nanoTime);
SubscribableListener<Void> fut = driver.run(maxTime, maxIterations, currentTimeNanosSupplier);
if (driver.isFinished()) {
onComplete(listener);
return;
}
if (fut.isDone()) {
schedule(maxTime, maxIterations, threadContext, executor, driver, listener);
schedule(maxTime, maxIterations, threadContext, executor, driver, listener, currentTimeNanosSupplier);
} else {
ActionListener<Void> readyListener = ActionListener.wrap(
ignored -> schedule(maxTime, maxIterations, threadContext, executor, driver, listener),
ignored -> schedule(maxTime, maxIterations, threadContext, executor, driver, listener, currentTimeNanosSupplier),
this::onFailure
);
fut.addListener(ContextPreservingActionListener.wrapPreservingContext(readyListener, threadContext));
Expand Down Expand Up @@ -570,7 +612,7 @@ private void reportSearchLoad(long extraCpuNanos, long now) {
* @param extraIterations how many iterations to add to the previous status
* @param status the status of the overall driver request
*/
private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.Status status, String reason) {
private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.Status status, String reason, long nowNanos) {
this.status.getAndUpdate(prev -> {
long now = System.currentTimeMillis();
DriverSleeps sleeps = prev.sleeps();
Expand All @@ -597,7 +639,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
}
}
}
reportSearchLoad(extraCpuNanos, now);
reportSearchLoad(extraCpuNanos, nowNanos);

return new DriverStatus(
sessionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public void testAccumulateSearchLoad() throws IOException {
IndexReader rLarge = null;
try {
r0 = simpleReader(dir0, 0, 1);
rLarge = simpleReader(dirLarge, 200, 10);
rLarge = simpleReader(dirLarge, 2000, 100);

List<ShardContext> shardContexts = List.of(new MockShardContext(r0, 0), new MockShardContext(rLarge, 1));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testAccumulateSearchLoad() throws IOException {
IndexReader rLarge = null;
try {
r0 = simpleReader(dir0, 0, 1);
rLarge = simpleReader(dirLarge, 200, 10);
rLarge = simpleReader(dirLarge, 2000, 100);

List<ShardContext> shardContexts = List.of(new LuceneSourceOperatorTests.MockShardContext(r0, 0) {
@Override
Expand Down