Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/142841.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 142841
summary: Attribute ES|QL shard search load in Lucene operators
type: feature
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -44,6 +45,7 @@
import java.util.TreeSet;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public abstract class LuceneOperator extends SourceOperator {
private static final Logger logger = LogManager.getLogger(LuceneOperator.class);
Expand Down Expand Up @@ -76,6 +78,24 @@ public abstract class LuceneOperator extends SourceOperator {
*/
long rowsEmitted;

/**
* Time spent per shard since the last {@link #shardLoadDelta(long)} call.
* Indexed by {@link ShardContext#index()}.
*/
final long[] shardProcessNanos;
final long[] shardRowsEmitted;

/**
* Start time for the current shard timing interval (System.nanoTime()).
* -1 means "not currently timing".
*/
private long shardClockStartNanos = -1;

/**
* Shard index currently being timed, or -1 if not timing.
*/
private int shardClockShardIndex = -1;

protected LuceneOperator(
IndexedByShardId<? extends RefCounted> refCounteds,
BlockFactory blockFactory,
Expand All @@ -87,6 +107,9 @@ protected LuceneOperator(
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.sliceQueue = sliceQueue;

this.shardProcessNanos = new long[sliceQueue.maxShardIndex() + 1];
this.shardRowsEmitted = new long[shardProcessNanos.length];
}

public abstract static class Factory implements SourceOperator.SourceOperatorFactory {
Expand Down Expand Up @@ -142,6 +165,7 @@ public final Page getOutput() {
pagesEmitted++;
rowsEmitted += page.getPositionCount();
}
stopShardClock(System.nanoTime());
return page;
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
Expand Down Expand Up @@ -188,9 +212,65 @@ LuceneScorer getCurrentOrLoadNextScorer() {
if (Thread.currentThread() != currentScorer.executingThread) {
currentScorer.reinitialize();
}
maybeStartShardClock(currentScorer);
return currentScorer;
}

protected LuceneSliceQueue getSliceQueue() {
return sliceQueue;
}

@Override
protected List<ShardLoad> shardLoadDelta(long now) {
stopShardClock(now);
var ret = IntStream.range(0, shardRowsEmitted.length)
.filter(index -> shardProcessNanos[index] > 0 || shardRowsEmitted[index] > 0)
.mapToObj(index -> new ShardLoad(sliceQueue.shardContext(index), shardProcessNanos[index], shardRowsEmitted[index]))
.toList();

Arrays.fill(shardRowsEmitted, 0);
Arrays.fill(shardProcessNanos, 0);
return ret;
}

private void maybeStartShardClock(LuceneScorer scorer) {
final int newShardIndex = scorer.shardContext().index();
if (shardClockStartNanos == -1L) {
// first timing interval since last loop
shardClockShardIndex = newShardIndex;
shardClockStartNanos = System.nanoTime();
return;
}

if (newShardIndex != shardClockShardIndex) {
// shard changed: record previous shard time and start timing the new shard
long now = System.nanoTime();
recordShardTimeUntil(shardClockShardIndex, shardClockStartNanos, now);
shardClockShardIndex = newShardIndex;
shardClockStartNanos = now;
}
// else: same shard, keep clock running
}

/**
* Stop timing (record up to now), but keep accumulated totals (no clearing).
* Useful when collection is finished.
*/
private void stopShardClock(long now) {
if (shardClockStartNanos != -1L) {
recordShardTimeUntil(shardClockShardIndex, shardClockStartNanos, now);
shardClockStartNanos = -1;
}
}

private void recordShardTimeUntil(int shardIndex, long shardStartNanos, long nowNanos) {
assert shardStartNanos >= 0L && shardIndex >= 0;
long delta = nowNanos - shardStartNanos;
if (delta > 0L) {
shardProcessNanos[shardIndex] += delta;
}
}

/**
* Wraps a {@link BulkScorer} with shard information
*/
Expand Down Expand Up @@ -524,8 +604,4 @@ public TransportVersion getMinimalSupportedVersion() {
return TransportVersion.minimumCompatible();
}
}

LuceneSliceQueue getSliceQueue() {
return sliceQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public record QueryAndTags(Query query, List<Object> tags) {}
public static final int MAX_DOCS_PER_SLICE = 250_000; // copied from IndexSearcher
public static final int MAX_SEGMENTS_PER_SLICE = 5; // copied from IndexSearcher

private final int maxShardIndex;
private final IntFunction<ShardContext> shardContexts;
private final int totalSlices;
private final Map<String, PartitioningStrategy> partitioningStrategies;
Expand Down Expand Up @@ -116,6 +117,7 @@ public record QueryAndTags(Query query, List<Object> tags) {}
List<LuceneSlice> sliceList,
Map<String, PartitioningStrategy> partitioningStrategies
) {
this.maxShardIndex = sliceList.stream().mapToInt(l -> l.shardContext().index()).max().orElse(-1);
this.shardContexts = shardContexts;
this.totalSlices = sliceList.size();
this.slices = new AtomicReferenceArray<>(sliceList.size());
Expand All @@ -137,7 +139,11 @@ public record QueryAndTags(Query query, List<Object> tags) {}
}
}

ShardContext getShardContext(int index) {
int maxShardIndex() {
return maxShardIndex;
}

ShardContext shardContext(int index) {
return shardContexts.apply(index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ public Page getCheckedOutput() throws IOException {
blocks[b++] = BlockUtils.constantBlock(blockFactory, e, currentPagePos);
}
page = new Page(currentPagePos, blocks);
shardRowsEmitted[shardId] += page.getPositionCount();
} finally {
if (page == null) {
Releasables.closeExpectNoException(shard, leaf, docs, Releasables.wrap(blocks));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ private Page emit() {
}

int shardId = shardContext.index();
shardRowsEmitted[shardId] += size;
shard = blockFactory.newConstantIntBlockWith(shardId, size).asVector();
segments = currentSegmentBuilder.build();
docs = currentDocsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ SubscribableListener<Void> run(TimeValue maxTime, int maxIterations, LongSupplie
IsBlockedResult isBlocked = Operator.NOT_BLOCKED;
try {
assert driverContext.assertBeginRunLoop();
isBlocked = runSingleLoopIteration();
isBlocked = runSingleLoopIteration(nowSupplier, lastStatusUpdateTime);
} catch (DriverEarlyTerminationException unused) {
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()));
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdateTime);
assert isFinished() : "not finished after early termination";
} catch (TaskCancelledException e) {
LOGGER.debug("Cancelling running driver [{}]", shortDescription, e);
Expand Down Expand Up @@ -271,7 +271,7 @@ public void abort(Exception reason, ActionListener<Void> listener) {
}
}

private IsBlockedResult runSingleLoopIteration() {
private IsBlockedResult runSingleLoopIteration(LongSupplier nowSupplier, long lastStatusUpdate) {
driverContext.checkForEarlyTermination();
boolean movedPage = false;

Expand Down Expand Up @@ -314,14 +314,14 @@ private IsBlockedResult runSingleLoopIteration() {
if (op.isFinished()) {
driverContext.checkForEarlyTermination();
var originalIndex = iterator.previousIndex();
var index = closeEarlyFinishedOperators(iterator);
var index = closeEarlyFinishedOperators(iterator, nowSupplier, lastStatusUpdate);
if (index >= 0) {
iterator = new ArrayList<>(activeOperators).listIterator(originalIndex - index);
}
}
}

closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()));
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()), nowSupplier, lastStatusUpdate);

if (movedPage == false) {
blockedResults.clear();
Expand All @@ -342,7 +342,7 @@ protected void onNoPagesMoved() {
}

// Returns the index of the last operator that was closed, -1 if no operator was closed.
protected int closeEarlyFinishedOperators(ListIterator<Operator> operators) {
protected int closeEarlyFinishedOperators(ListIterator<Operator> operators, LongSupplier nowSupplier, long lastStatusUpdate) {
var iterator = activeOperators.listIterator(operators.nextIndex());
while (iterator.hasPrevious()) {
if (iterator.previous().isFinished()) {
Expand All @@ -357,6 +357,11 @@ protected int closeEarlyFinishedOperators(ListIterator<Operator> operators) {
while (finishedOperators.hasNext()) {
Operator op = finishedOperators.next();
statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status()));
if (op instanceof SourceOperator sourceOperator) {
long now = nowSupplier.getAsLong();
// report one last time before closing
sourceOperator.reportSearchLoad(now - lastStatusUpdate, now);
}
op.close();
finishedOperators.remove();
}
Expand Down Expand Up @@ -551,6 +556,14 @@ public DriverProfile profile() {
);
}

private void reportSearchLoad(long extraCpuNanos, long now) {
activeOperators.stream()
.filter(o -> o instanceof SourceOperator)
.map(o -> (SourceOperator) o)
.findFirst()
.ifPresent(sourceOperator -> sourceOperator.reportSearchLoad(extraCpuNanos, now));
}

/**
* Update the status.
* @param extraCpuNanos how many cpu nanoseconds to add to the previous status
Expand Down Expand Up @@ -584,6 +597,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
}
}
}
reportSearchLoad(extraCpuNanos, now);

return new DriverStatus(
sessionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import org.elasticsearch.compute.Describable;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.ShardContext;

import java.util.List;

/**
* A source operator - produces output, accepts no input.
Expand Down Expand Up @@ -49,4 +52,89 @@ public interface SourceOperatorFactory extends OperatorFactory, Describable {
@Override
SourceOperator get(DriverContext driverContext);
}

public record ShardLoad(ShardContext context, long processTimeNanos, long rowsEmitted) {}

/**
* Returns a snapshot of shard load accumulated since the previous invocation.
*
* <p>This method is meant to be overridden by {@link SourceOperator}
* implementations that can attribute produced pages to shards or indices.
* <p>The snapshot represents a <em>delta</em>: implementations must reset their
* internal counters after the snapshot is taken so that each invocation reports
* only the load observed since the last call.</p>
*
* <p>Source operators that do not operate on shards or indices can ignore this
* mechanism and rely on the default implementation, which returns an empty list.</p>
*
* @return list of per-shard load since the last call
*/
protected List<ShardLoad> shardLoadDelta(long now) {
return List.of();
}

/**
* Attributes the CPU time delta since the previous call to the shards that
* produced pages during the same interval.
*
* <p>This method is invoked internally on {@link SourceOperator}. Sources are
* responsible for shard-level attribution because they are the point in the
* execution pipeline where output pages can still be associated with the shards
* or indices that produced them.</p>
*
* <p>The {@code extraCpuNanos} parameter represents the elapsed CPU time since
* the previous invocation. That time is attributed to shards according to the
* load snapshot returned by {@link #shardLoadDelta(long)} for the same interval.</p>
*
* <p>Attribution rules:
* <ul>
* <li>If exactly one shard contributed load, the full {@code extraCpuNanos} is
* attributed to that shard.</li>
* <li>If multiple shards contributed, each shard is first attributed the
* process time it directly reported.</li>
* <li>Any remaining CPU time is then distributed proportionally based on the
* number of rows emitted by each shard.</li>
* </ul>
*
* <p>This method assumes that {@link #shardLoadDelta(long)} returns a delta and
* resets its internal counters on each invocation.</p>
*
* @param extraCpuNanos CPU time delta, in nanoseconds, since the previous call
*/
final void reportSearchLoad(long extraCpuNanos, long now) {
final List<ShardLoad> delta = shardLoadDelta(now);
final int size = delta.size();
if (size == 0) {
return;
}

if (size == 1) {
// Single shard: attribute all processing to it
delta.getFirst().context().stats().accumulateSearchLoad(extraCpuNanos, now);
return;
}

long totalProcess = 0L;
long totalRows = 0L;
for (var load : delta) {
totalProcess += load.processTimeNanos();
totalRows += load.rowsEmitted();
}
if (totalRows == 0L && totalProcess == 0L && extraCpuNanos == 0L) {
return;
}

final long rest = Math.max(0L, extraCpuNanos - totalProcess);

for (var load : delta) {
long weightedExtra = load.processTimeNanos();
if (rest > 0L && load.rowsEmitted() > 0L) {
// Distribute remaining CPU proportionally by rows emitted
weightedExtra += Math.round((double) rest * ((double) load.rowsEmitted() / (double) totalRows));
}
if (weightedExtra > 0L) {
load.context().stats().accumulateSearchLoad(weightedExtra, now);
}
}
}
}
Loading