diff --git a/docs/changelog/143133.yaml b/docs/changelog/143133.yaml new file mode 100644 index 0000000000000..7c51c32f270f4 --- /dev/null +++ b/docs/changelog/143133.yaml @@ -0,0 +1,5 @@ +area: ES|QL +issues: [] +pr: 143133 +summary: ES|QL - Top N queries are parallelized +type: enhancement diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java index 68ba9f26ac682..0ac57407bdc65 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneSliceQueue.java @@ -282,16 +282,12 @@ List> groups(IndexSearcher searcher, int taskConc }, /** * See {@link DataPartitioning#SEGMENT}. + * Uses the searcher's {@link IndexSearcher#getSlices()} to partition segments. */ SEGMENT(1) { @Override List> groups(IndexSearcher searcher, int taskConcurrency) { - IndexSearcher.LeafSlice[] gs = IndexSearcher.slices( - searcher.getLeafContexts(), - MAX_DOCS_PER_SLICE, - MAX_SEGMENTS_PER_SLICE, - false - ); + IndexSearcher.LeafSlice[] gs = searcher.getSlices(); return Arrays.stream(gs).map(g -> Arrays.stream(g.partitions).map(PartialLeafReaderContext::new).toList()).toList(); } }, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java index ec165d790cab5..c20fbdb1c5043 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperator.java @@ -67,6 +67,7 @@ public Factory( IndexedByShardId contexts, Function> queryFunction, DataPartitioning dataPartitioning, + DataPartitioning.AutoStrategy autoStrategy, int taskConcurrency, int maxPageSize, int limit, @@ -78,7 +79,9 @@ public Factory( contexts, queryFunction, dataPartitioning, - query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + dataPartitioning == DataPartitioning.AUTO ? autoStrategy.pickStrategy(limit) : query -> { + throw new UnsupportedOperationException("locked in " + dataPartitioning); + }, taskConcurrency, limit, needsScore, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java index ddf7452447e22..291fb99c2d2db 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneSourceOperatorTests.java @@ -502,6 +502,9 @@ public static class MockShardContext implements ShardContext { private final ContextIndexSearcher searcher; private final ShardSearchStats shardSearchStats; + private static final int MIN_DOCS_PER_SLICE = 50_000; + private static final int MAX_SLICES_NUMBER = 10; + // TODO Reuse this overload in the places that pass 0. public MockShardContext(IndexReader reader) { this(reader, 0); @@ -519,7 +522,10 @@ public MockShardContext(IndexReader reader, int index) { IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), TrivialQueryCachingPolicy.NEVER, - true + true, + Runnable::run, + MAX_SLICES_NUMBER, + MIN_DOCS_PER_SLICE ); } else { this.searcher = null; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorCollectorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorCollectorTests.java index 904c5ebc1707f..d2140a92aacd2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorCollectorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorCollectorTests.java @@ -362,11 +362,12 @@ private LuceneTopNSourceOperator.Factory createFactory(boolean needsScore, List< new IndexedByShardIdFromSingleton<>(ctx), queryFunction, partitioning, + DataPartitioning.AutoStrategy.DEFAULT, randomIntBetween(1, 10), 10000, randomIntBetween(10, 100), sorts, - randomIntBetween(10, 20), + randomLongBetween(10, 20), needsScore ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorScoringTests.java index d5d04c14c3bfc..4dcf786364df2 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorScoringTests.java @@ -105,6 +105,7 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromSingleton<>(ctx), queryFunction, dataPartitioning, + LuceneSourceOperator.Factory::autoStrategy, taskConcurrency, maxPageSize, limit, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java index 0f410038d62d3..29412750a9d5f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/query/LuceneTopNSourceOperatorTests.java @@ -113,6 +113,7 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromSingleton<>(ctx), queryFunction, dataPartitioning, + LuceneSourceOperator.Factory::autoStrategy, taskConcurrency, maxPageSize, limit, @@ -254,6 +255,7 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromList<>(shardContexts), queryFunction, DataPartitioning.SHARD, + DataPartitioning.AutoStrategy.DEFAULT, taskConcurrency, maxPageSize, 10, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index c268e3eda648a..c83b3844766b8 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -62,7 +62,7 @@ public static Iterable parameters() { for (String index : new String[] { "idx", "small_idx" }) { for (Case c : new Case[] { new Case("", "SHARD"), - new Case("| SORT @timestamp ASC", "SHARD"), + new Case("| SORT @timestamp ASC", "SEGMENT"), new Case("| WHERE ABS(a) == 1", "DOC"), new Case("| WHERE a == 1", "SHARD"), new Case("| STATS SUM(a)", "DOC"), @@ -76,18 +76,18 @@ public static Iterable parameters() { new Case("| WHERE QSTR(\"a:1\")", "SHARD"), new Case("| WHERE KQL(\"a:1\")", "SHARD"), new Case("| WHERE a:\"1\"", "SHARD"), - new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SHARD", true), - new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SHARD", true), - new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SHARD", true), - new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SHARD", true), - new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SHARD", true), - new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SHARD", true), }) { + new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SEGMENT", true), + new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SEGMENT", true), + new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SEGMENT", true), + new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SEGMENT", true), + new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SEGMENT", true), + new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SEGMENT", true), }) { params.add( new Object[] { defaultDataPartitioning, index, "FROM " + index + (c.score ? " METADATA _score " : " ") + c.suffix, - expectedPartition(defaultDataPartitioning, index, c.idxPartition) } + expectedPartition(defaultDataPartitioning, index, c.idxPartition, c.score) } ); } } @@ -135,15 +135,15 @@ private void setDefaultDataPartitioning(String defaultDataPartitioning) throws I assertThat(code, equalTo(200)); } - private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition) { + private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition, boolean score) { return switch (defaultDataPartitioning) { - case null -> expectedAutoPartition(index, idxPartition); - case "auto" -> expectedAutoPartition(index, idxPartition); + case null -> expectedAutoPartition(index, idxPartition, score); + case "auto" -> expectedAutoPartition(index, idxPartition, score); default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT)); }; } - private static Matcher expectedAutoPartition(String index, String idxPartition) { + private static Matcher expectedAutoPartition(String index, String idxPartition, boolean score) { return equalTo(switch (index) { case "idx" -> idxPartition; case "small_idx" -> "SHARD"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 130f442f48fb1..a87437de02542 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -22,6 +22,7 @@ import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.lucene.IndexedByShardId; +import org.elasticsearch.compute.lucene.query.DataPartitioning.AutoStrategy; import org.elasticsearch.compute.lucene.query.LuceneCountOperator; import org.elasticsearch.compute.lucene.query.LuceneOperator; import org.elasticsearch.compute.lucene.query.LuceneSliceQueue; @@ -365,6 +366,7 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, shardContexts, querySupplier(esQueryExec.query()), context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), + topNAutoStrategy(), context.queryPragmas().taskConcurrency(), context.pageSize(esQueryExec, rowEstimatedSize), limit, @@ -399,6 +401,10 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, return PhysicalOperation.fromSource(luceneFactory, layout.build()); } + private static AutoStrategy topNAutoStrategy() { + return unusedLimit -> query -> LuceneSliceQueue.PartitioningStrategy.SEGMENT; + } + List extractFields(FieldExtractExec fieldExtractExec) { List attributes = fieldExtractExec.attributesToExtract(); List fieldInfos = new ArrayList<>(attributes.size());