Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
fa5c598
First version, use highSpeedAutoStrategy
carlosdelest Feb 9, 2026
55e0b9b
Use highSpeedAutoStrategy
carlosdelest Feb 9, 2026
0ebd70b
Fix tests to take into account new partitioning
carlosdelest Feb 9, 2026
398c225
Fix tests
carlosdelest Feb 9, 2026
7dad5db
[CI] Auto commit changes from spotless
Feb 9, 2026
45f70c3
Use a static method for strategy and document it
carlosdelest Feb 11, 2026
6c819de
Merge remote-tracking branch 'origin/main' into enhancement/esql-luce…
carlosdelest Feb 11, 2026
7b94cdd
Merge remote-tracking branch 'carlosdelest/enhancement/esql-lucene-to…
carlosdelest Feb 11, 2026
fcad45d
Merge branch 'enhancement/esql-lucene-top-n-data-partition-strategy' …
carlosdelest Feb 23, 2026
baf08d0
Fix merge
carlosdelest Feb 24, 2026
4a54f9d
Adapt ContextIndexSearcher computeSlices() method to SEGMENT partitio…
carlosdelest Feb 25, 2026
d2f188e
Merge remote-tracking branch 'origin/main' into enhancement/esql-topn…
carlosdelest Feb 25, 2026
6d67af4
Reuse context grouping algorithm from ContextIndexSearcher
carlosdelest Feb 25, 2026
3562179
Add a new enum for data partitioning for LuceneSliceQueue
carlosdelest Feb 26, 2026
17ea11b
Revert test changes
carlosdelest Feb 26, 2026
b3942a0
Merge remote-tracking branch 'origin/main' into enhancement/esql-topn…
carlosdelest Feb 26, 2026
05992b6
Minor refactoring
carlosdelest Feb 26, 2026
bd9d546
Fix tests
carlosdelest Feb 26, 2026
86e7f33
Update docs/changelog/143133.yaml
carlosdelest Feb 26, 2026
d0b06e3
Use IndexSearcher.getSlices() instead of directly invoking the static…
carlosdelest Feb 26, 2026
0d74dfa
Revert changes to ContextIndexSearcher
carlosdelest Feb 26, 2026
7e81b97
Merge remote-tracking branch 'carlosdelest/enhancement/esql-topn-data…
carlosdelest Feb 26, 2026
a88aa31
Merge remote-tracking branch 'origin/main' into enhancement/esql-topn…
carlosdelest Feb 26, 2026
a00f22d
Fix changelog
carlosdelest Feb 26, 2026
fa0e1a6
Revert unnecessary changes
carlosdelest Feb 26, 2026
fb1433c
Update docs/changelog/143133.yaml
carlosdelest Feb 26, 2026
b6921f9
Merge remote-tracking branch 'origin/main' into enhancement/esql-topn…
carlosdelest Feb 27, 2026
aeb107d
Change strategy location
carlosdelest Feb 27, 2026
0c7ff24
Minor refactoring
carlosdelest Feb 27, 2026
37ad63a
Merge remote-tracking branch 'carlosdelest/enhancement/esql-topn-data…
carlosdelest Feb 27, 2026
d228965
[CI] Auto commit changes from spotless
Feb 27, 2026
756b8e1
Change Top N strategy to SEGMENT, to be comparable to using slices in…
carlosdelest Feb 27, 2026
f8a1b5f
Merge remote-tracking branch 'carlosdelest/enhancement/esql-topn-data…
carlosdelest Feb 27, 2026
15cd5aa
Remove capability check
carlosdelest Feb 27, 2026
21293eb
Fix private modifier
carlosdelest Feb 27, 2026
0ab00bb
Merge branch 'main' into enhancement/esql-topn-data-partition-strategy
carlosdelest Feb 27, 2026
b76aeab
Merge branch 'main' into enhancement/esql-topn-data-partition-strategy
carlosdelest Feb 27, 2026
6697b70
Merge branch 'main' into enhancement/esql-topn-data-partition-strategy
carlosdelest Mar 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/143133.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 143133
summary: ES|QL - Top N queries are parallelized
type: enhancement
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,12 @@ List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConc
},
/**
* See {@link DataPartitioning#SEGMENT}.
* Uses the searcher's {@link IndexSearcher#getSlices()} to partition segments.
*/
SEGMENT(1) {
@Override
List<List<PartialLeafReaderContext>> groups(IndexSearcher searcher, int taskConcurrency) {
IndexSearcher.LeafSlice[] gs = IndexSearcher.slices(
searcher.getLeafContexts(),
MAX_DOCS_PER_SLICE,
MAX_SEGMENTS_PER_SLICE,
false
);
IndexSearcher.LeafSlice[] gs = searcher.getSlices();
return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList();
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public Factory(
IndexedByShardId<? extends ShardContext> contexts,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
DataPartitioning.AutoStrategy autoStrategy,
int taskConcurrency,
int maxPageSize,
int limit,
Expand All @@ -78,7 +79,9 @@ public Factory(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
dataPartitioning == DataPartitioning.AUTO ? autoStrategy.pickStrategy(limit) : query -> {
throw new UnsupportedOperationException("locked in " + dataPartitioning);
},
taskConcurrency,
limit,
needsScore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,9 @@ public static class MockShardContext implements ShardContext {
private final ContextIndexSearcher searcher;
private final ShardSearchStats shardSearchStats;

private static final int MIN_DOCS_PER_SLICE = 50_000;
private static final int MAX_SLICES_NUMBER = 10;

// TODO Reuse this overload in the places that pass 0.
public MockShardContext(IndexReader reader) {
this(reader, 0);
Expand All @@ -519,7 +522,10 @@ public MockShardContext(IndexReader reader, int index) {
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
TrivialQueryCachingPolicy.NEVER,
true
true,
Runnable::run,
MAX_SLICES_NUMBER,
MIN_DOCS_PER_SLICE
);
} else {
this.searcher = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,12 @@ private LuceneTopNSourceOperator.Factory createFactory(boolean needsScore, List<
new IndexedByShardIdFromSingleton<>(ctx),
queryFunction,
partitioning,
DataPartitioning.AutoStrategy.DEFAULT,
randomIntBetween(1, 10),
10000,
randomIntBetween(10, 100),
sorts,
randomIntBetween(10, 20),
randomLongBetween(10, 20),
needsScore
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
new IndexedByShardIdFromSingleton<>(ctx),
queryFunction,
dataPartitioning,
LuceneSourceOperator.Factory::autoStrategy,
taskConcurrency,
maxPageSize,
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
new IndexedByShardIdFromSingleton<>(ctx),
queryFunction,
dataPartitioning,
LuceneSourceOperator.Factory::autoStrategy,
taskConcurrency,
maxPageSize,
limit,
Expand Down Expand Up @@ -254,6 +255,7 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
new IndexedByShardIdFromList<>(shardContexts),
queryFunction,
DataPartitioning.SHARD,
DataPartitioning.AutoStrategy.DEFAULT,
taskConcurrency,
maxPageSize,
10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static Iterable<Object[]> parameters() {
for (String index : new String[] { "idx", "small_idx" }) {
for (Case c : new Case[] {
new Case("", "SHARD"),
new Case("| SORT @timestamp ASC", "SHARD"),
new Case("| SORT @timestamp ASC", "SEGMENT"),
new Case("| WHERE ABS(a) == 1", "DOC"),
new Case("| WHERE a == 1", "SHARD"),
new Case("| STATS SUM(a)", "DOC"),
Expand All @@ -76,18 +76,18 @@ public static Iterable<Object[]> parameters() {
new Case("| WHERE QSTR(\"a:1\")", "SHARD"),
new Case("| WHERE KQL(\"a:1\")", "SHARD"),
new Case("| WHERE a:\"1\"", "SHARD"),
new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SHARD", true),
new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SHARD", true),
new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SHARD", true),
new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SHARD", true),
new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SHARD", true),
new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SHARD", true), }) {
new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SEGMENT", true),
new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SEGMENT", true),
new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SEGMENT", true),
new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SEGMENT", true),
new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SEGMENT", true),
new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SEGMENT", true), }) {
params.add(
new Object[] {
defaultDataPartitioning,
index,
"FROM " + index + (c.score ? " METADATA _score " : " ") + c.suffix,
expectedPartition(defaultDataPartitioning, index, c.idxPartition) }
expectedPartition(defaultDataPartitioning, index, c.idxPartition, c.score) }
);
}
}
Expand Down Expand Up @@ -135,15 +135,15 @@ private void setDefaultDataPartitioning(String defaultDataPartitioning) throws I
assertThat(code, equalTo(200));
}

private static Matcher<String> expectedPartition(String defaultDataPartitioning, String index, String idxPartition) {
private static Matcher<String> expectedPartition(String defaultDataPartitioning, String index, String idxPartition, boolean score) {
return switch (defaultDataPartitioning) {
case null -> expectedAutoPartition(index, idxPartition);
case "auto" -> expectedAutoPartition(index, idxPartition);
case null -> expectedAutoPartition(index, idxPartition, score);
case "auto" -> expectedAutoPartition(index, idxPartition, score);
default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT));
};
}

private static Matcher<String> expectedAutoPartition(String index, String idxPartition) {
private static Matcher<String> expectedAutoPartition(String index, String idxPartition, boolean score) {
return equalTo(switch (index) {
case "idx" -> idxPartition;
case "small_idx" -> "SHARD";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.lucene.IndexedByShardId;
import org.elasticsearch.compute.lucene.query.DataPartitioning.AutoStrategy;
import org.elasticsearch.compute.lucene.query.LuceneCountOperator;
import org.elasticsearch.compute.lucene.query.LuceneOperator;
import org.elasticsearch.compute.lucene.query.LuceneSliceQueue;
Expand Down Expand Up @@ -365,6 +366,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
shardContexts,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
topNAutoStrategy(),
context.queryPragmas().taskConcurrency(),
context.pageSize(esQueryExec, rowEstimatedSize),
limit,
Expand Down Expand Up @@ -399,6 +401,10 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
return PhysicalOperation.fromSource(luceneFactory, layout.build());
}

private static AutoStrategy topNAutoStrategy() {
return unusedLimit -> query -> LuceneSliceQueue.PartitioningStrategy.SEGMENT;
}

List<ValuesSourceReaderOperator.FieldInfo> extractFields(FieldExtractExec fieldExtractExec) {
List<Attribute> attributes = fieldExtractExec.attributesToExtract();
List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>(attributes.size());
Expand Down