diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 38dc611836c72..ceb650762d938 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -118,7 +118,7 @@ public String describe() { * Pick a strategy for the {@link DataPartitioning#AUTO} partitioning. */ public static Function autoStrategy(int limit) { - return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; + return limit == NO_LIMIT ? LuceneSourceOperator::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; } /** @@ -133,90 +133,91 @@ private static PartitioningStrategy lowOverheadAutoStrategy(Query query) { return SHARD; } - /** - * Select the {@link PartitioningStrategy} based on the {@link Query}. - * - */ - private static PartitioningStrategy highSpeedAutoStrategy(Query query) { - Query unwrapped = unwrap(query); - log.trace("highSpeedAutoStrategy {} {}", query, unwrapped); - return switch (unwrapped) { - case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q); - case MatchAllDocsQuery q -> DOC; - case MatchNoDocsQuery q -> SHARD; - default -> SEGMENT; - }; - } + } - private static Query unwrap(Query query) { - while (true) { - switch (query) { - case BoostQuery q: { - query = q.getQuery(); - break; - } - case ConstantScoreQuery q: { - query = q.getQuery(); - break; - } - default: - return query; + /** + * Select the {@link PartitioningStrategy} based on the {@link Query}. + * + */ + public static PartitioningStrategy highSpeedAutoStrategy(Query query) { + Query unwrapped = unwrap(query); + log.trace("highSpeedAutoStrategy {} {}", query, unwrapped); + return switch (unwrapped) { + case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q); + case MatchAllDocsQuery q -> DOC; + case MatchNoDocsQuery q -> SHARD; + default -> SEGMENT; + }; + } + + private static Query unwrap(Query query) { + while (true) { + switch (query) { + case BoostQuery q: { + query = q.getQuery(); + break; } + case ConstantScoreQuery q: { + query = q.getQuery(); + break; + } + default: + return query; } } + } - /** - * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}. - * - */ - private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) { - List clauses = new ArrayList<>(query.clauses().size()); - boolean allRequired = true; - for (BooleanClause c : query) { - Query clauseQuery = unwrap(c.query()); - log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery); - if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery) - || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) { - // Can't match anything - return SHARD; - } - allRequired &= c.isRequired(); - clauses.add(highSpeedAutoStrategy(clauseQuery)); - } - log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses); - if (allRequired == false) { - return SEGMENT; - } - if (clauses.stream().anyMatch(s -> s == SHARD)) { + /** + * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}. + * + */ + private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) { + List clauses = new ArrayList<>(query.clauses().size()); + boolean allRequired = true; + for (BooleanClause c : query) { + Query clauseQuery = unwrap(c.query()); + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery); + if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery) + || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) { + // Can't match anything return SHARD; } - if (clauses.stream().anyMatch(s -> s == SEGMENT)) { - return SEGMENT; - } - assert clauses.stream().allMatch(s -> s == DOC); - return DOC; + allRequired &= c.isRequired(); + clauses.add(highSpeedAutoStrategy(clauseQuery)); + } + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses); + if (allRequired == false) { + return SEGMENT; + } + if (clauses.stream().anyMatch(s -> s == SHARD)) { + return SHARD; + } + if (clauses.stream().anyMatch(s -> s == SEGMENT)) { + return SEGMENT; } + assert clauses.stream().allMatch(s -> s == DOC); + return DOC; } @SuppressWarnings("this-escape") diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 5d3571aa350dc..d3db37cad47df 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -62,6 +62,7 @@ public Factory( IndexedByShardId contexts, Function> queryFunction, DataPartitioning dataPartitioning, + DataPartitioning.AutoStrategy autoStrategy, int taskConcurrency, int maxPageSize, int limit, @@ -73,7 +74,9 @@ public Factory( contexts, queryFunction, dataPartitioning, - query -> LuceneSliceQueue.PartitioningStrategy.SHARD, + dataPartitioning == DataPartitioning.AUTO ? autoStrategy.pickStrategy(limit) : query -> { + throw new UnsupportedOperationException("locked in " + dataPartitioning); + }, taskConcurrency, limit, needsScore, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index ce9ea6d8b5439..6f3de2fc4cdc7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -103,6 +103,7 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromSingleton<>(ctx), queryFunction, dataPartitioning, + LuceneSourceOperator.Factory::autoStrategy, taskConcurrency, maxPageSize, limit, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index 39c9525510c57..be9d88707ee79 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -109,6 +109,7 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromSingleton<>(ctx), queryFunction, dataPartitioning, + LuceneSourceOperator.Factory::autoStrategy, taskConcurrency, maxPageSize, limit, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index bd4280fc118fb..cee7a687f1a8d 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.hamcrest.Matcher; import org.junit.ClassRule; @@ -62,7 +63,7 @@ public static Iterable parameters() { for (String index : new String[] { "idx", "small_idx" }) { for (Case c : new Case[] { new Case("", "SHARD"), - new Case("| SORT @timestamp ASC", "SHARD"), + new Case("| SORT @timestamp ASC", "DOC"), new Case("| WHERE ABS(a) == 1", "DOC"), new Case("| WHERE a == 1", "SHARD"), new Case("| STATS SUM(a)", "DOC"), @@ -73,18 +74,18 @@ public static Iterable parameters() { new Case("| WHERE QSTR(\"a:1\")", "SHARD"), new Case("| WHERE KQL(\"a:1\")", "SHARD"), new Case("| WHERE a:\"1\"", "SHARD"), - new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SHARD", true), - new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SHARD", true), - new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SHARD", true), - new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SHARD", true), - new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SHARD", true), - new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SHARD", true), }) { + new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SEGMENT", true), + new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SEGMENT", true), + new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SEGMENT", true), + new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SEGMENT", true), + new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SEGMENT", true), + new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SEGMENT", true), }) { params.add( new Object[] { defaultDataPartitioning, index, "FROM " + index + (c.score ? " METADATA _score " : " ") + c.suffix, - expectedPartition(defaultDataPartitioning, index, c.idxPartition) } + expectedPartition(defaultDataPartitioning, index, c.idxPartition, c.score) } ); } } @@ -132,15 +133,19 @@ private void setDefaultDataPartitioning(String defaultDataPartitioning) throws I assertThat(code, equalTo(200)); } - private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition) { + private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition, boolean score) { return switch (defaultDataPartitioning) { - case null -> expectedAutoPartition(index, idxPartition); - case "auto" -> expectedAutoPartition(index, idxPartition); + case null -> expectedAutoPartition(index, idxPartition, score); + case "auto" -> expectedAutoPartition(index, idxPartition, score); default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT)); }; } - private static Matcher expectedAutoPartition(String index, String idxPartition) { + private static Matcher expectedAutoPartition(String index, String idxPartition, boolean score) { + boolean lateMaterializationEnabled = EsqlCapabilities.Cap.ENABLE_REDUCE_NODE_LATE_MATERIALIZATION.isEnabled(); + if (score && lateMaterializationEnabled == false) { + return equalTo("SHARD"); + } return equalTo(switch (index) { case "idx" -> idxPartition; case "small_idx" -> "SHARD"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 2c96a57a2589c..3d70796b7d924 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.IndexedByShardId; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.lucene.LuceneOperator; @@ -69,6 +70,7 @@ import org.elasticsearch.search.lookup.SourceFilter; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -333,12 +335,14 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, * references to the same underlying data, but we're being a bit paranoid here. */ estimatedPerRowSortSize *= 2; + // LuceneTopNSourceOperator does not support QueryAndTags, if there are multiple queries or if the single query has tags, // UnsupportedOperationException will be thrown by esQueryExec.query() luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), + topNAutoStrategy(), context.queryPragmas().taskConcurrency(), context.pageSize(esQueryExec, rowEstimatedSize), limit, @@ -373,6 +377,16 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, return PhysicalOperation.fromSource(luceneFactory, layout.build()); } + private static DataPartitioning.AutoStrategy topNAutoStrategy() { + return unusedLimit -> { + if (EsqlCapabilities.Cap.ENABLE_REDUCE_NODE_LATE_MATERIALIZATION.isEnabled()) { + // Use high speed strategy for TopN - we want to parallelize searches as much as possible given the query structure + return LuceneSourceOperator::highSpeedAutoStrategy; + } + return query -> LuceneSliceQueue.PartitioningStrategy.SHARD; + }; + } + List extractFields(FieldExtractExec fieldExtractExec) { List attributes = fieldExtractExec.attributesToExtract(); List fieldInfos = new ArrayList<>(attributes.size()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java index bc399889adb01..83cb2012a2044 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java @@ -30,7 +30,7 @@ public SumOverTimeTests(@Name("TestCase") Supplier te @ParametersFactory public static Iterable parameters() { - return SumTests.parameters(); + return SumTests.testParameters(false); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java index ee8a6ed8948f9..4ea7c309923de 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java @@ -38,6 +38,10 @@ public SumTests(@Name("TestCase") Supplier testCaseSu @ParametersFactory public static Iterable parameters() { + return testParameters(true); + } + + static Iterable testParameters(boolean includeDenseVector) { var suppliers = new ArrayList(); Stream.of( @@ -80,25 +84,28 @@ public static Iterable parameters() { DataType.DOUBLE, equalTo(200.) ) - ), - new TestCaseSupplier(List.of(DataType.AGGREGATE_METRIC_DOUBLE), () -> { - var value = new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral( - randomDouble(), - randomDouble(), - randomDouble(), - randomNonNegativeInt() - ); - return new TestCaseSupplier.TestCase( - List.of(TestCaseSupplier.TypedData.multiRow(List.of(value), DataType.AGGREGATE_METRIC_DOUBLE, "field")), - standardAggregatorName("Sum", DataType.AGGREGATE_METRIC_DOUBLE), - DataType.DOUBLE, - equalTo(value.sum()) - ); - - }) + ) ) ); + if (includeDenseVector) { + suppliers.add(new TestCaseSupplier(List.of(DataType.AGGREGATE_METRIC_DOUBLE), () -> { + var value = new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral( + randomDouble(), + randomDouble(), + randomDouble(), + randomNonNegativeInt() + ); + return new TestCaseSupplier.TestCase( + List.of(TestCaseSupplier.TypedData.multiRow(List.of(value), DataType.AGGREGATE_METRIC_DOUBLE, "field")), + standardAggregatorName("Sum", DataType.AGGREGATE_METRIC_DOUBLE), + DataType.DOUBLE, + equalTo(value.sum()) + ); + + })); + } + return parameterSuppliersFromTypedDataWithDefaultChecks(suppliers); }