diff --git a/muted-tests.yml b/muted-tests.yml index a2619c4e8fd0f..d02e401422621 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -396,15 +396,9 @@ tests: - class: org.elasticsearch.reindex.management.ReindexManagementClientYamlTestSuiteIT method: test {yaml=reindex/30_cancel_reindex/Cancel running reindex returns response and GET confirms completed} issue: https://github.com/elastic/elasticsearch/issues/142079 -- class: org.elasticsearch.compute.lucene.query.LuceneTopNSourceOperatorScoringTests - method: testAccumulateSearchLoad - issue: https://github.com/elastic/elasticsearch/issues/142986 - class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT method: test issue: https://github.com/elastic/elasticsearch/issues/143023 -- class: org.elasticsearch.compute.lucene.query.LuceneTopNSourceOperatorTests - method: testAccumulateSearchLoad - issue: https://github.com/elastic/elasticsearch/issues/143111 - class: org.elasticsearch.xpack.sql.qa.security.CliApiKeyIT method: testCliConnectionWithApiKey issue: https://github.com/elastic/elasticsearch/issues/143125 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 3405dd0b7a49c..dc19e3dfc3bbc 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -174,11 +174,11 @@ public DriverContext driverContext() { * Returns a blocked future when the chain of operators is blocked, allowing the caller * thread to do other work instead of blocking or busy-spinning on the blocked operator. */ - SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplier nowSupplier) { - updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running"); + SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplier currentTimeNanosSupplier) { long maxTimeNanos = maxTime.nanos(); // Start time, used to stop the calculations after maxTime has passed. - long startTime = nowSupplier.getAsLong(); + long startTime = currentTimeNanosSupplier.getAsLong(); + updateStatus(0, 0, DriverStatus.Status.RUNNING, "driver running", startTime); // The time of the next forced status update. long nextStatus = startTime + statusNanos; // Total executed iterations this run, used to stop the calculations after maxIterations have passed. @@ -191,9 +191,13 @@ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplie IsBlockedResult isBlocked = Operator.NOT_BLOCKED; try { assert driverContext.assertBeginRunLoop(); - isBlocked = runSingleLoopIteration(nowSupplier, lastStatusUpdateTime); + isBlocked = runSingleLoopIteration(currentTimeNanosSupplier, lastStatusUpdateTime); } catch (DriverEarlyTerminationException unused) { - closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdateTime); + closeEarlyFinishedOperators( + activeOperators.listIterator(activeOperators.size()), + currentTimeNanosSupplier, + lastStatusUpdateTime + ); assert isFinished() : "not finished after early termination"; } catch (TaskCancelledException e) { LOGGER.debug("Cancelling running driver [{}]", shortDescription, e); @@ -216,28 +220,52 @@ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplie totalIterationsThisRun++; iterationsSinceLastStatusUpdate++; - long now = nowSupplier.getAsLong(); + long now = currentTimeNanosSupplier.getAsLong(); if (isBlocked.listener().isDone() == false) { - updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.ASYNC, isBlocked.reason()); + updateStatus( + now - lastStatusUpdateTime, + iterationsSinceLastStatusUpdate, + DriverStatus.Status.ASYNC, + isBlocked.reason(), + now + ); return isBlocked.listener(); } if (isFinished()) { finishNanos = now; - updateStatus(finishNanos - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.DONE, "driver done"); + updateStatus( + finishNanos - lastStatusUpdateTime, + iterationsSinceLastStatusUpdate, + DriverStatus.Status.DONE, + "driver done", + now + ); driverContext.finish(); Releasables.close(releasable, driverContext.getSnapshot()); return Operator.NOT_BLOCKED.listener(); } if (totalIterationsThisRun >= maxIterations) { - updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver iterations"); + updateStatus( + now - lastStatusUpdateTime, + iterationsSinceLastStatusUpdate, + DriverStatus.Status.WAITING, + "driver iterations", + now + ); return Operator.NOT_BLOCKED.listener(); } if (now - startTime >= maxTimeNanos) { - updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time"); + updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.WAITING, "driver time", now); return Operator.NOT_BLOCKED.listener(); } if (now > nextStatus) { - updateStatus(now - lastStatusUpdateTime, iterationsSinceLastStatusUpdate, DriverStatus.Status.RUNNING, "driver running"); + updateStatus( + now - lastStatusUpdateTime, + iterationsSinceLastStatusUpdate, + DriverStatus.Status.RUNNING, + "driver running", + now + ); iterationsSinceLastStatusUpdate = 0; lastStatusUpdateTime = now; nextStatus = now + statusNanos; @@ -271,7 +299,7 @@ public void abort(Exception reason, ActionListener listener) { } } - private IsBlockedResult runSingleLoopIteration(LongSupplier nowSupplier, long lastStatusUpdate) { + private IsBlockedResult runSingleLoopIteration(LongSupplier currentTimeNanosSupplier, long lastStatusUpdate) { driverContext.checkForEarlyTermination(); boolean movedPage = false; @@ -314,14 +342,14 @@ private IsBlockedResult runSingleLoopIteration(LongSupplier nowSupplier, long la if (op.isFinished()) { driverContext.checkForEarlyTermination(); var originalIndex = iterator.previousIndex(); - var index = closeEarlyFinishedOperators(iterator, nowSupplier, lastStatusUpdate); + var index = closeEarlyFinishedOperators(iterator, currentTimeNanosSupplier, lastStatusUpdate); if (index >= 0) { iterator = new ArrayList<>(activeOperators).listIterator(originalIndex - index); } } } - closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdate); + closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), currentTimeNanosSupplier, lastStatusUpdate); if (movedPage == false) { blockedResults.clear(); @@ -342,7 +370,11 @@ protected void onNoPagesMoved() { } // Returns the index of the last operator that was closed, -1 if no operator was closed. - protected int closeEarlyFinishedOperators(ListIterator operators, LongSupplier nowSupplier, long lastStatusUpdate) { + protected int closeEarlyFinishedOperators( + ListIterator operators, + LongSupplier currentTimeNanosSupplier, + long lastStatusUpdate + ) { var iterator = activeOperators.listIterator(operators.nextIndex()); while (iterator.hasPrevious()) { if (iterator.previous().isFinished()) { @@ -358,7 +390,7 @@ protected int closeEarlyFinishedOperators(ListIterator operators, Long Operator op = finishedOperators.next(); statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status())); if (op instanceof SourceOperator sourceOperator) { - long now = nowSupplier.getAsLong(); + long now = currentTimeNanosSupplier.getAsLong(); // report one last time before closing sourceOperator.reportSearchLoad(now - lastStatusUpdate, now); } @@ -402,9 +434,18 @@ public static void start( ) { driver.completionListener.addListener(listener); if (driver.started.compareAndSet(false, true)) { - driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting"); + LongSupplier currentTimeNanosSupplier = System::nanoTime; + driver.updateStatus(0, 0, DriverStatus.Status.STARTING, "driver starting", currentTimeNanosSupplier.getAsLong()); initializeEarlyTerminationChecker(driver); - schedule(DEFAULT_TIME_BEFORE_YIELDING, maxIterations, threadContext, executor, driver, driver.completionListener); + schedule( + DEFAULT_TIME_BEFORE_YIELDING, + maxIterations, + threadContext, + executor, + driver, + driver.completionListener, + currentTimeNanosSupplier + ); } } @@ -455,22 +496,23 @@ private static void schedule( ThreadContext threadContext, Executor executor, Driver driver, - ActionListener listener + ActionListener listener, + LongSupplier currentTimeNanosSupplier ) { final var task = new AbstractRunnable() { @Override protected void doRun() { - SubscribableListener fut = driver.run(maxTime, maxIterations, System::nanoTime); + SubscribableListener fut = driver.run(maxTime, maxIterations, currentTimeNanosSupplier); if (driver.isFinished()) { onComplete(listener); return; } if (fut.isDone()) { - schedule(maxTime, maxIterations, threadContext, executor, driver, listener); + schedule(maxTime, maxIterations, threadContext, executor, driver, listener, currentTimeNanosSupplier); } else { ActionListener readyListener = ActionListener.wrap( - ignored -> schedule(maxTime, maxIterations, threadContext, executor, driver, listener), + ignored -> schedule(maxTime, maxIterations, threadContext, executor, driver, listener, currentTimeNanosSupplier), this::onFailure ); fut.addListener(ContextPreservingActionListener.wrapPreservingContext(readyListener, threadContext)); @@ -570,7 +612,7 @@ private void reportSearchLoad(long extraCpuNanos, long now) { * @param extraIterations how many iterations to add to the previous status * @param status the status of the overall driver request */ - private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.Status status, String reason) { + private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.Status status, String reason, long nowNanos) { this.status.getAndUpdate(prev -> { long now = System.currentTimeMillis(); DriverSleeps sleeps = prev.sleeps(); @@ -597,7 +639,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. } } } - reportSearchLoad(extraCpuNanos, now); + reportSearchLoad(extraCpuNanos, nowNanos); return new DriverStatus( sessionId, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java index 291fb99c2d2db..e304853d98c73 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java @@ -438,7 +438,7 @@ public void testAccumulateSearchLoad() throws IOException { IndexReader rLarge = null; try { r0 = simpleReader(dir0, 0, 1); - rLarge = simpleReader(dirLarge, 200, 10); + rLarge = simpleReader(dirLarge, 2000, 100); List shardContexts = List.of(new MockShardContext(r0, 0), new MockShardContext(rLarge, 1)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java index 29412750a9d5f..e0bd4b9d234ad 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java @@ -228,7 +228,7 @@ public void testAccumulateSearchLoad() throws IOException { IndexReader rLarge = null; try { r0 = simpleReader(dir0, 0, 1); - rLarge = simpleReader(dirLarge, 200, 10); + rLarge = simpleReader(dirLarge, 2000, 100); List shardContexts = List.of(new LuceneSourceOperatorTests.MockShardContext(r0, 0) { @Override