diff --git a/docs/changelog/142841.yaml b/docs/changelog/142841.yaml new file mode 100644 index 0000000000000..cbfdd4de8de5d --- /dev/null +++ b/docs/changelog/142841.yaml @@ -0,0 +1,5 @@ +area: ES|QL +issues: [] +pr: 142841 +summary: Attribute ES|QL shard search load in Lucene operators +type: feature diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneOperator.java index 77501bd162d18..969b250873152 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneOperator.java @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,6 +45,7 @@ import java.util.TreeSet; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; public abstract class LuceneOperator extends SourceOperator { private static final Logger logger = LogManager.getLogger(LuceneOperator.class); @@ -76,6 +78,24 @@ public abstract class LuceneOperator extends SourceOperator { */ long rowsEmitted; + /** + * Time spent per shard since the last {@link #shardLoadDelta(long)} call. + * Indexed by {@link ShardContext#index()}. + */ + final long[] shardProcessNanos; + final long[] shardRowsEmitted; + + /** + * Start time for the current shard timing interval (System.nanoTime()). + * -1 means "not currently timing". + */ + private long shardClockStartNanos = -1; + + /** + * Shard index currently being timed, or -1 if not timing. + */ + private int shardClockShardIndex = -1; + protected LuceneOperator( IndexedByShardId refCounteds, BlockFactory blockFactory, @@ -87,6 +107,9 @@ protected LuceneOperator( this.blockFactory = blockFactory; this.maxPageSize = maxPageSize; this.sliceQueue = sliceQueue; + + this.shardProcessNanos = new long[sliceQueue.maxShardIndex() + 1]; + this.shardRowsEmitted = new long[shardProcessNanos.length]; } public abstract static class Factory implements SourceOperator.SourceOperatorFactory { @@ -142,6 +165,7 @@ public final Page getOutput() { pagesEmitted++; rowsEmitted += page.getPositionCount(); } + stopShardClock(System.nanoTime()); return page; } catch (IOException ioe) { throw new UncheckedIOException(ioe); @@ -188,9 +212,65 @@ LuceneScorer getCurrentOrLoadNextScorer() { if (Thread.currentThread() != currentScorer.executingThread) { currentScorer.reinitialize(); } + maybeStartShardClock(currentScorer); return currentScorer; } + protected LuceneSliceQueue getSliceQueue() { + return sliceQueue; + } + + @Override + protected List shardLoadDelta(long now) { + stopShardClock(now); + var ret = IntStream.range(0, shardRowsEmitted.length) + .filter(index -> shardProcessNanos[index] > 0 || shardRowsEmitted[index] > 0) + .mapToObj(index -> new ShardLoad(sliceQueue.shardContext(index), shardProcessNanos[index], shardRowsEmitted[index])) + .toList(); + + Arrays.fill(shardRowsEmitted, 0); + Arrays.fill(shardProcessNanos, 0); + return ret; + } + + private void maybeStartShardClock(LuceneScorer scorer) { + final int newShardIndex = scorer.shardContext().index(); + if (shardClockStartNanos == -1L) { + // first timing interval since last loop + shardClockShardIndex = newShardIndex; + shardClockStartNanos = System.nanoTime(); + return; + } + + if (newShardIndex != shardClockShardIndex) { + // shard changed: record previous shard time and start timing the new shard + long now = System.nanoTime(); + recordShardTimeUntil(shardClockShardIndex, shardClockStartNanos, now); + shardClockShardIndex = newShardIndex; + shardClockStartNanos = now; + } + // else: same shard, keep clock running + } + + /** + * Stop timing (record up to now), but keep accumulated totals (no clearing). + * Useful when collection is finished. + */ + private void stopShardClock(long now) { + if (shardClockStartNanos != -1L) { + recordShardTimeUntil(shardClockShardIndex, shardClockStartNanos, now); + shardClockStartNanos = -1; + } + } + + private void recordShardTimeUntil(int shardIndex, long shardStartNanos, long nowNanos) { + assert shardStartNanos >= 0L && shardIndex >= 0; + long delta = nowNanos - shardStartNanos; + if (delta > 0L) { + shardProcessNanos[shardIndex] += delta; + } + } + /** * Wraps a {@link BulkScorer} with shard information */ @@ -524,8 +604,4 @@ public TransportVersion getMinimalSupportedVersion() { return TransportVersion.minimumCompatible(); } } - - LuceneSliceQueue getSliceQueue() { - return sliceQueue; - } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java index 855e6eb2de6b7..73f877231daf8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java @@ -82,6 +82,7 @@ public record QueryAndTags(Query query, List tags) {} public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher + private final int maxShardIndex; private final IntFunction shardContexts; private final int totalSlices; private final Map partitioningStrategies; @@ -116,6 +117,7 @@ public record QueryAndTags(Query query, List tags) {} List sliceList, Map partitioningStrategies ) { + this.maxShardIndex = sliceList.stream().mapToInt(l -> l.shardContext().index()).max().orElse(-1); this.shardContexts = shardContexts; this.totalSlices = sliceList.size(); this.slices = new AtomicReferenceArray<>(sliceList.size()); @@ -137,7 +139,11 @@ public record QueryAndTags(Query query, List tags) {} } } - ShardContext getShardContext(int index) { + int maxShardIndex() { + return maxShardIndex; + } + + ShardContext shardContext(int index) { return shardContexts.apply(index); } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperator.java index f76cba8c1d591..66ca4f13bf389 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperator.java @@ -349,6 +349,7 @@ public Page getCheckedOutput() throws IOException { blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos); } page = new Page(currentPagePos, blocks); + shardRowsEmitted[shardId] += page.getPositionCount(); } finally { if (page == null) { Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks)); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java index b910da6f3b780..ec165d790cab5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java @@ -309,6 +309,7 @@ private Page emit() { } int shardId = shardContext.index(); + shardRowsEmitted[shardId] += size; shard = blockFactory.newConstantIntBlockWith(shardId, size).asVector(); segments = currentSegmentBuilder.build(); docs = currentDocsBuilder.build(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 46aa8539c05b6..3405dd0b7a49c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -191,9 +191,9 @@ SubscribableListener run(TimeValue maxTime, int maxIterations, LongSupplie IsBlockedResult isBlocked = Operator.NOT_BLOCKED; try { assert driverContext.assertBeginRunLoop(); - isBlocked = runSingleLoopIteration(); + isBlocked = runSingleLoopIteration(nowSupplier, lastStatusUpdateTime); } catch (DriverEarlyTerminationException unused) { - closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size())); + closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdateTime); assert isFinished() : "not finished after early termination"; } catch (TaskCancelledException e) { LOGGER.debug("Cancelling running driver [{}]", shortDescription, e); @@ -271,7 +271,7 @@ public void abort(Exception reason, ActionListener listener) { } } - private IsBlockedResult runSingleLoopIteration() { + private IsBlockedResult runSingleLoopIteration(LongSupplier nowSupplier, long lastStatusUpdate) { driverContext.checkForEarlyTermination(); boolean movedPage = false; @@ -314,14 +314,14 @@ private IsBlockedResult runSingleLoopIteration() { if (op.isFinished()) { driverContext.checkForEarlyTermination(); var originalIndex = iterator.previousIndex(); - var index = closeEarlyFinishedOperators(iterator); + var index = closeEarlyFinishedOperators(iterator, nowSupplier, lastStatusUpdate); if (index >= 0) { iterator = new ArrayList<>(activeOperators).listIterator(originalIndex - index); } } } - closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size())); + closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdate); if (movedPage == false) { blockedResults.clear(); @@ -342,7 +342,7 @@ protected void onNoPagesMoved() { } // Returns the index of the last operator that was closed, -1 if no operator was closed. - protected int closeEarlyFinishedOperators(ListIterator operators) { + protected int closeEarlyFinishedOperators(ListIterator operators, LongSupplier nowSupplier, long lastStatusUpdate) { var iterator = activeOperators.listIterator(operators.nextIndex()); while (iterator.hasPrevious()) { if (iterator.previous().isFinished()) { @@ -357,6 +357,11 @@ protected int closeEarlyFinishedOperators(ListIterator operators) { while (finishedOperators.hasNext()) { Operator op = finishedOperators.next(); statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status())); + if (op instanceof SourceOperator sourceOperator) { + long now = nowSupplier.getAsLong(); + // report one last time before closing + sourceOperator.reportSearchLoad(now - lastStatusUpdate, now); + } op.close(); finishedOperators.remove(); } @@ -551,6 +556,14 @@ public DriverProfile profile() { ); } + private void reportSearchLoad(long extraCpuNanos, long now) { + activeOperators.stream() + .filter(o -> o instanceof SourceOperator) + .map(o -> (SourceOperator) o) + .findFirst() + .ifPresent(sourceOperator -> sourceOperator.reportSearchLoad(extraCpuNanos, now)); + } + /** * Update the status. * @param extraCpuNanos how many cpu nanoseconds to add to the previous status @@ -584,6 +597,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. } } } + reportSearchLoad(extraCpuNanos, now); return new DriverStatus( sessionId, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SourceOperator.java index 5b66114111666..d42ecfe1f6045 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/SourceOperator.java @@ -9,6 +9,9 @@ import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.ShardContext; + +import java.util.List; /** * A source operator - produces output, accepts no input. @@ -49,4 +52,89 @@ public interface SourceOperatorFactory extends OperatorFactory, Describable { @Override SourceOperator get(DriverContext driverContext); } + + public record ShardLoad(ShardContext context, long processTimeNanos, long rowsEmitted) {} + + /** + * Returns a snapshot of shard load accumulated since the previous invocation. + * + *

This method is meant to be overridden by {@link SourceOperator} + * implementations that can attribute produced pages to shards or indices. + *

The snapshot represents a delta: implementations must reset their + * internal counters after the snapshot is taken so that each invocation reports + * only the load observed since the last call.

+ * + *

Source operators that do not operate on shards or indices can ignore this + * mechanism and rely on the default implementation, which returns an empty list.

+ * + * @return list of per-shard load since the last call + */ + protected List shardLoadDelta(long now) { + return List.of(); + } + + /** + * Attributes the CPU time delta since the previous call to the shards that + * produced pages during the same interval. + * + *

This method is invoked internally on {@link SourceOperator}. Sources are + * responsible for shard-level attribution because they are the point in the + * execution pipeline where output pages can still be associated with the shards + * or indices that produced them.

+ * + *

The {@code extraCpuNanos} parameter represents the elapsed CPU time since + * the previous invocation. That time is attributed to shards according to the + * load snapshot returned by {@link #shardLoadDelta(long)} for the same interval.

+ * + *

Attribution rules: + *

    + *
  • If exactly one shard contributed load, the full {@code extraCpuNanos} is + * attributed to that shard.
  • + *
  • If multiple shards contributed, each shard is first attributed the + * process time it directly reported.
  • + *
  • Any remaining CPU time is then distributed proportionally based on the + * number of rows emitted by each shard.
  • + *
+ * + *

This method assumes that {@link #shardLoadDelta(long)} returns a delta and + * resets its internal counters on each invocation.

+ * + * @param extraCpuNanos CPU time delta, in nanoseconds, since the previous call + */ + final void reportSearchLoad(long extraCpuNanos, long now) { + final List delta = shardLoadDelta(now); + final int size = delta.size(); + if (size == 0) { + return; + } + + if (size == 1) { + // Single shard: attribute all processing to it + delta.getFirst().context().stats().accumulateSearchLoad(extraCpuNanos, now); + return; + } + + long totalProcess = 0L; + long totalRows = 0L; + for (var load : delta) { + totalProcess += load.processTimeNanos(); + totalRows += load.rowsEmitted(); + } + if (totalRows == 0L && totalProcess == 0L && extraCpuNanos == 0L) { + return; + } + + final long rest = Math.max(0L, extraCpuNanos - totalProcess); + + for (var load : delta) { + long weightedExtra = load.processTimeNanos(); + if (rest > 0L && load.rowsEmitted() > 0L) { + // Distribute remaining CPU proportionally by rows emitted + weightedExtra += Math.round((double) rest * ((double) load.rowsEmitted() / (double) totalRows)); + } + if (weightedExtra > 0L) { + load.context().stats().accumulateSearchLoad(weightedExtra, now); + } + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java index bb0caf96af5a2..4ebc7f42c906d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.IndexedByShardIdFromList; import org.elasticsearch.compute.lucene.IndexedByShardIdFromSingleton; import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; @@ -192,7 +193,7 @@ int numResults(int numDocs) { void assertSourceOperator(LuceneSourceOperator sourceOperator) { for (int shard = 0; shard < sourceOperator.refCounteds.size(); shard++) { - var shardContext = sourceOperator.getSliceQueue().getShardContext(shard); + var shardContext = sourceOperator.getSliceQueue().shardContext(shard); assertThat(shardContext.stats().stats().getTotal().getSearchLoadRate(), greaterThan(0d)); } } @@ -430,6 +431,56 @@ private void testSimple(DriverContext ctx, DataPartitioning partitioning, int nu testCase.assertSourceOperator(sourceOperator); } + public void testAccumulateSearchLoad() throws IOException { + Directory dir0 = newDirectory(); + Directory dirLarge = newDirectory(); + IndexReader r0 = null; + IndexReader rLarge = null; + try { + r0 = simpleReader(dir0, 0, 1); + rLarge = simpleReader(dirLarge, 200, 10); + + List shardContexts = List.of(new MockShardContext(r0, 0), new MockShardContext(rLarge, 1)); + + Function> queryFunction = c -> TestCase.MATCH_ALL.queryAndExtra(); + int maxPageSize = 1; + int taskConcurrency = randomIntBetween(1, 4); + + LuceneSourceOperator.Factory factory = new LuceneSourceOperator.Factory( + new IndexedByShardIdFromList<>(shardContexts), + queryFunction, + DataPartitioning.SEGMENT, + DataPartitioning.AutoStrategy.DEFAULT, + taskConcurrency, + maxPageSize, + LuceneOperator.NO_LIMIT, + scoring + ); + DriverContext ctx = driverContext(); + LuceneSourceOperator sourceOperator = (LuceneSourceOperator) factory.get(ctx); + List results = new ArrayList<>(); + new TestDriverRunner().run( + TestDriverFactory.create(ctx, sourceOperator, List.of(), new TestResultPageSinkOperator(results::add)) + ); + + var res = sourceOperator.shardLoadDelta(System.nanoTime()); + assertThat(res.size(), equalTo(0)); + + long totalPositions = results.stream().mapToInt(Page::getPositionCount).sum(); + assertThat(totalPositions, greaterThan(0L)); + OperatorTestCase.assertDriverContext(ctx); + + double load0 = shardContexts.get(0).stats().stats().getTotal().getSearchLoadRate(); + double loadLarge = shardContexts.get(1).stats().stats().getTotal().getSearchLoadRate(); + + // Shard with no docs may still see minimal load from query rewrite, but should be <= others + assertThat(loadLarge, greaterThan(0d)); + assertThat(loadLarge, greaterThanOrEqualTo(load0)); + } finally { + IOUtils.close(r0, rLarge, dir0, dirLarge); + } + } + // Returns the initial block index, ignoring the score block if scoring is enabled private int initialBlockIndex(Page page) { assert page.getBlock(0) instanceof DocBlock : "expected doc block at index 0"; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java index 4c125718b8d56..0f410038d62d3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.lucene.IndexedByShardIdFromList; import org.elasticsearch.compute.lucene.IndexedByShardIdFromSingleton; import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperatorTests; @@ -53,6 +54,8 @@ import static org.elasticsearch.compute.lucene.query.LuceneSourceOperatorTests.assertAllRefCountedSameInstance; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.matchesRegex; @@ -71,14 +74,9 @@ protected LuceneTopNSourceOperator.Factory simple(SimpleOptions options) { return simple(DataPartitioning.SHARD, 10_000, 100); } - private LuceneTopNSourceOperator.Factory simple(DataPartitioning dataPartitioning, int size, int limit) { - int commitEvery = Math.max(1, size / 10); + private static IndexReader simpleReader(Directory dir, int size, int commitEvery) { try ( - RandomIndexWriter writer = new RandomIndexWriter( - random(), - directory, - newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE) - ) + RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)) ) { for (int d = 0; d < size; d++) { List doc = new ArrayList<>(); @@ -88,11 +86,15 @@ private LuceneTopNSourceOperator.Factory simple(DataPartitioning dataPartitionin writer.commit(); } } - reader = writer.getReader(); + return writer.getReader(); } catch (IOException e) { throw new RuntimeException(e); } + } + private LuceneTopNSourceOperator.Factory simple(DataPartitioning dataPartitioning, int size, int limit) { + int commitEvery = Math.max(1, size / 10); + reader = simpleReader(directory, size, commitEvery); ShardContext ctx = new LuceneSourceOperatorTests.MockShardContext(reader, 0) { @Override public Optional buildSort(List> sorts) { @@ -218,6 +220,73 @@ protected void testSimple(DriverContext ctx, int size, int limit) { assertThat(results, hasSize(pages)); } + public void testAccumulateSearchLoad() throws IOException { + Directory dir0 = newDirectory(); + Directory dirLarge = newDirectory(); + IndexReader r0 = null; + IndexReader rLarge = null; + try { + r0 = simpleReader(dir0, 0, 1); + rLarge = simpleReader(dirLarge, 200, 10); + + List shardContexts = List.of(new LuceneSourceOperatorTests.MockShardContext(r0, 0) { + @Override + public Optional buildSort(List> sorts) { + SortField field = new SortedNumericSortField("s", SortField.Type.LONG, false, SortedNumericSelector.Type.MIN); + return Optional.of(new SortAndFormats(new Sort(field), new DocValueFormat[] { null })); + } + }, new LuceneSourceOperatorTests.MockShardContext(rLarge, 1) { + @Override + public Optional buildSort(List> sorts) { + SortField field = new SortedNumericSortField("s", SortField.Type.LONG, false, SortedNumericSelector.Type.MIN); + return Optional.of(new SortAndFormats(new Sort(field), new DocValueFormat[] { null })); + } + }); + + Function> queryFunction = c -> List.of( + new LuceneSliceQueue.QueryAndTags(Queries.ALL_DOCS_INSTANCE, List.of()) + ); + int taskConcurrency = 0; + int maxPageSize = between(10, 100); + List> sorts = List.of(new FieldSortBuilder("s")); + long estimatedPerRowSortSize = 16; + LuceneTopNSourceOperator.Factory factory = new LuceneTopNSourceOperator.Factory( + new IndexedByShardIdFromList<>(shardContexts), + queryFunction, + DataPartitioning.SHARD, + taskConcurrency, + maxPageSize, + 10, + sorts, + estimatedPerRowSortSize, + scoring + ); + DriverContext ctx = driverContext(); + LuceneTopNSourceOperator sourceOperator = (LuceneTopNSourceOperator) factory.get(ctx); + List results = new ArrayList<>(); + new TestDriverRunner().run( + TestDriverFactory.create(ctx, sourceOperator, List.of(), new TestResultPageSinkOperator(results::add)) + ); + + var res = sourceOperator.shardLoadDelta(System.nanoTime()); + assertThat(res.size(), equalTo(0)); + + long totalPositions = results.stream().mapToInt(Page::getPositionCount).sum(); + assertThat(totalPositions, equalTo(10L)); + + double load0 = shardContexts.get(0).stats().stats().getTotal().getSearchLoadRate(); + double loadLarge = shardContexts.get(1).stats().stats().getTotal().getSearchLoadRate(); + + // Shard with no docs may still see minimal load from query rewrite, but should be <= others + assertThat(loadLarge, greaterThan(0d)); + assertThat(loadLarge, greaterThanOrEqualTo(load0)); + + OperatorTestCase.assertDriverContext(ctx); + } finally { + IOUtils.close(r0, rLarge, dir0, dirLarge); + } + } + // Scores are not interesting to this test, but enabled conditionally and effectively ignored just for coverage. private final boolean scoring = randomBoolean(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index eb32d34e24e6c..3b8bfa33c5443 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -129,11 +129,11 @@ public void testProfileAndStatusOneIterationAtATime() { assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE)); assertThat(driver.status().started(), equalTo(startEpoch)); assertThat(driver.status().iterations(), equalTo((long) inPages.size())); - assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.status().cpuNanos(), equalTo(tickTime * (inPages.size() + 1))); logger.info("profile {}", driver.profile()); assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1))); - assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.profile().cpuNanos(), equalTo(tickTime * (inPages.size() + 1))); assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); } @@ -168,11 +168,11 @@ public void testProfileAndStatusTimeout() { assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE)); assertThat(driver.status().started(), equalTo(startEpoch)); assertThat(driver.status().iterations(), equalTo((long) inPages.size())); - assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.status().cpuNanos(), equalTo(tickTime * (inPages.size() + 1))); logger.info("profile {}", driver.profile()); assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1))); - assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.profile().cpuNanos(), equalTo(tickTime * (inPages.size() + 1))); assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); } @@ -206,11 +206,11 @@ public void testProfileAndStatusInterval() { assertThat(driver.status().status(), equalTo(DriverStatus.Status.DONE)); assertThat(driver.status().started(), equalTo(startEpoch)); assertThat(driver.status().iterations(), equalTo((long) inPages.size())); - assertThat(driver.status().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.status().cpuNanos(), equalTo(tickTime * (inPages.size() + 1))); logger.info("profile {}", driver.profile()); assertThat(driver.profile().tookNanos(), equalTo(waitTime + tickTime * (nowSupplier.callCount - 1))); - assertThat(driver.profile().cpuNanos(), equalTo(tickTime * inPages.size())); + assertThat(driver.profile().cpuNanos(), equalTo(tickTime * (inPages.size() + 1))); assertThat(driver.profile().iterations(), equalTo((long) inPages.size())); }