From 49d1f6d3ff8bcb2077457722d1d7bc43fd8af9ec Mon Sep 17 00:00:00 2001 From: carlosdelest Date: Fri, 13 Feb 2026 10:01:44 +0100 Subject: [PATCH] Revert "ESQL - Use high speed strategy for LuceneTopNSourceOperator (#142128)" This reverts commit f1ed35872beb785223ef4f38b6aae94ba3520be0. --- .../compute/lucene/LuceneSourceOperator.java | 155 +++++++++--------- .../lucene/LuceneTopNSourceOperator.java | 5 +- .../LuceneTopNSourceOperatorScoringTests.java | 1 - .../lucene/LuceneTopNSourceOperatorTests.java | 1 - .../qa/single_node/EsqlPartitioningIT.java | 29 ++-- .../planner/EsPhysicalOperationProviders.java | 14 -- .../function/aggregate/SumOverTimeTests.java | 2 +- .../function/aggregate/SumTests.java | 39 ++--- 8 files changed, 107 insertions(+), 139 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index ceb650762d938..38dc611836c72 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -118,7 +118,7 @@ public String describe() { * Pick a strategy for the {@link DataPartitioning#AUTO} partitioning. */ public static Function autoStrategy(int limit) { - return limit == NO_LIMIT ? LuceneSourceOperator::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; + return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy; } /** @@ -133,91 +133,90 @@ private static PartitioningStrategy lowOverheadAutoStrategy(Query query) { return SHARD; } - } - - /** - * Select the {@link PartitioningStrategy} based on the {@link Query}. - * - */ - public static PartitioningStrategy highSpeedAutoStrategy(Query query) { - Query unwrapped = unwrap(query); - log.trace("highSpeedAutoStrategy {} {}", query, unwrapped); - return switch (unwrapped) { - case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q); - case MatchAllDocsQuery q -> DOC; - case MatchNoDocsQuery q -> SHARD; - default -> SEGMENT; - }; - } + /** + * Select the {@link PartitioningStrategy} based on the {@link Query}. + * + */ + private static PartitioningStrategy highSpeedAutoStrategy(Query query) { + Query unwrapped = unwrap(query); + log.trace("highSpeedAutoStrategy {} {}", query, unwrapped); + return switch (unwrapped) { + case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q); + case MatchAllDocsQuery q -> DOC; + case MatchNoDocsQuery q -> SHARD; + default -> SEGMENT; + }; + } - private static Query unwrap(Query query) { - while (true) { - switch (query) { - case BoostQuery q: { - query = q.getQuery(); - break; - } - case ConstantScoreQuery q: { - query = q.getQuery(); - break; + private static Query unwrap(Query query) { + while (true) { + switch (query) { + case BoostQuery q: { + query = q.getQuery(); + break; + } + case ConstantScoreQuery q: { + query = q.getQuery(); + break; + } + default: + return query; } - default: - return query; } } - } - /** - * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}. - * - */ - private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) { - List clauses = new ArrayList<>(query.clauses().size()); - boolean allRequired = true; - for (BooleanClause c : query) { - Query clauseQuery = unwrap(c.query()); - log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery); - if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery) - || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) { - // Can't match anything + /** + * Select the {@link PartitioningStrategy} for a {@link BooleanQuery}. + *
    + *
  • + * If the query can't match anything, returns {@link PartitioningStrategy#SEGMENT}. + *
  • + * + *
+ */ + private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) { + List clauses = new ArrayList<>(query.clauses().size()); + boolean allRequired = true; + for (BooleanClause c : query) { + Query clauseQuery = unwrap(c.query()); + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery); + if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery) + || (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) { + // Can't match anything + return SHARD; + } + allRequired &= c.isRequired(); + clauses.add(highSpeedAutoStrategy(clauseQuery)); + } + log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses); + if (allRequired == false) { + return SEGMENT; + } + if (clauses.stream().anyMatch(s -> s == SHARD)) { return SHARD; } - allRequired &= c.isRequired(); - clauses.add(highSpeedAutoStrategy(clauseQuery)); - } - log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses); - if (allRequired == false) { - return SEGMENT; - } - if (clauses.stream().anyMatch(s -> s == SHARD)) { - return SHARD; - } - if (clauses.stream().anyMatch(s -> s == SEGMENT)) { - return SEGMENT; + if (clauses.stream().anyMatch(s -> s == SEGMENT)) { + return SEGMENT; + } + assert clauses.stream().allMatch(s -> s == DOC); + return DOC; } - assert clauses.stream().allMatch(s -> s == DOC); - return DOC; } @SuppressWarnings("this-escape") diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index d3db37cad47df..5d3571aa350dc 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -62,7 +62,6 @@ public Factory( IndexedByShardId contexts, Function> queryFunction, DataPartitioning dataPartitioning, - DataPartitioning.AutoStrategy autoStrategy, int taskConcurrency, int maxPageSize, int limit, @@ -74,9 +73,7 @@ public Factory( contexts, queryFunction, dataPartitioning, - dataPartitioning == DataPartitioning.AUTO ? autoStrategy.pickStrategy(limit) : query -> { - throw new UnsupportedOperationException("locked in " + dataPartitioning); - }, + query -> LuceneSliceQueue.PartitioningStrategy.SHARD, taskConcurrency, limit, needsScore, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 6f3de2fc4cdc7..ce9ea6d8b5439 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -103,7 +103,6 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromSingleton<>(ctx), queryFunction, dataPartitioning, - LuceneSourceOperator.Factory::autoStrategy, taskConcurrency, maxPageSize, limit, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index be9d88707ee79..39c9525510c57 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -109,7 +109,6 @@ public Optional buildSort(List> sorts) { new IndexedByShardIdFromSingleton<>(ctx), queryFunction, dataPartitioning, - LuceneSourceOperator.Factory::autoStrategy, taskConcurrency, maxPageSize, limit, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java index cee7a687f1a8d..bd4280fc118fb 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlPartitioningIT.java @@ -22,7 +22,6 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; -import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.hamcrest.Matcher; import org.junit.ClassRule; @@ -63,7 +62,7 @@ public static Iterable parameters() { for (String index : new String[] { "idx", "small_idx" }) { for (Case c : new Case[] { new Case("", "SHARD"), - new Case("| SORT @timestamp ASC", "DOC"), + new Case("| SORT @timestamp ASC", "SHARD"), new Case("| WHERE ABS(a) == 1", "DOC"), new Case("| WHERE a == 1", "SHARD"), new Case("| STATS SUM(a)", "DOC"), @@ -74,18 +73,18 @@ public static Iterable parameters() { new Case("| WHERE QSTR(\"a:1\")", "SHARD"), new Case("| WHERE KQL(\"a:1\")", "SHARD"), new Case("| WHERE a:\"1\"", "SHARD"), - new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SEGMENT", true), - new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SEGMENT", true), - new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SEGMENT", true), - new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SEGMENT", true), - new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SEGMENT", true), - new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SEGMENT", true), }) { + new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SHARD", true), + new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SHARD", true), + new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SHARD", true), + new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SHARD", true), + new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SHARD", true), + new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SHARD", true), }) { params.add( new Object[] { defaultDataPartitioning, index, "FROM " + index + (c.score ? " METADATA _score " : " ") + c.suffix, - expectedPartition(defaultDataPartitioning, index, c.idxPartition, c.score) } + expectedPartition(defaultDataPartitioning, index, c.idxPartition) } ); } } @@ -133,19 +132,15 @@ private void setDefaultDataPartitioning(String defaultDataPartitioning) throws I assertThat(code, equalTo(200)); } - private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition, boolean score) { + private static Matcher expectedPartition(String defaultDataPartitioning, String index, String idxPartition) { return switch (defaultDataPartitioning) { - case null -> expectedAutoPartition(index, idxPartition, score); - case "auto" -> expectedAutoPartition(index, idxPartition, score); + case null -> expectedAutoPartition(index, idxPartition); + case "auto" -> expectedAutoPartition(index, idxPartition); default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT)); }; } - private static Matcher expectedAutoPartition(String index, String idxPartition, boolean score) { - boolean lateMaterializationEnabled = EsqlCapabilities.Cap.ENABLE_REDUCE_NODE_LATE_MATERIALIZATION.isEnabled(); - if (score && lateMaterializationEnabled == false) { - return equalTo("SHARD"); - } + private static Matcher expectedAutoPartition(String index, String idxPartition) { return equalTo(switch (index) { case "idx" -> idxPartition; case "small_idx" -> "SHARD"; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 3d70796b7d924..2c96a57a2589c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -20,7 +20,6 @@ import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.IndexedByShardId; import org.elasticsearch.compute.lucene.LuceneCountOperator; import org.elasticsearch.compute.lucene.LuceneOperator; @@ -70,7 +69,6 @@ import org.elasticsearch.search.lookup.SourceFilter; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; -import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -335,14 +333,12 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, * references to the same underlying data, but we're being a bit paranoid here. */ estimatedPerRowSortSize *= 2; - // LuceneTopNSourceOperator does not support QueryAndTags, if there are multiple queries or if the single query has tags, // UnsupportedOperationException will be thrown by esQueryExec.query() luceneFactory = new LuceneTopNSourceOperator.Factory( shardContexts, querySupplier(esQueryExec.query()), context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()), - topNAutoStrategy(), context.queryPragmas().taskConcurrency(), context.pageSize(esQueryExec, rowEstimatedSize), limit, @@ -377,16 +373,6 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec, return PhysicalOperation.fromSource(luceneFactory, layout.build()); } - private static DataPartitioning.AutoStrategy topNAutoStrategy() { - return unusedLimit -> { - if (EsqlCapabilities.Cap.ENABLE_REDUCE_NODE_LATE_MATERIALIZATION.isEnabled()) { - // Use high speed strategy for TopN - we want to parallelize searches as much as possible given the query structure - return LuceneSourceOperator::highSpeedAutoStrategy; - } - return query -> LuceneSliceQueue.PartitioningStrategy.SHARD; - }; - } - List extractFields(FieldExtractExec fieldExtractExec) { List attributes = fieldExtractExec.attributesToExtract(); List fieldInfos = new ArrayList<>(attributes.size()); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java index 83cb2012a2044..bc399889adb01 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTimeTests.java @@ -30,7 +30,7 @@ public SumOverTimeTests(@Name("TestCase") Supplier te @ParametersFactory public static Iterable parameters() { - return SumTests.testParameters(false); + return SumTests.parameters(); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java index 4ea7c309923de..ee8a6ed8948f9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumTests.java @@ -38,10 +38,6 @@ public SumTests(@Name("TestCase") Supplier testCaseSu @ParametersFactory public static Iterable parameters() { - return testParameters(true); - } - - static Iterable testParameters(boolean includeDenseVector) { var suppliers = new ArrayList(); Stream.of( @@ -84,28 +80,25 @@ static Iterable testParameters(boolean includeDenseVector) { DataType.DOUBLE, equalTo(200.) ) - ) + ), + new TestCaseSupplier(List.of(DataType.AGGREGATE_METRIC_DOUBLE), () -> { + var value = new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral( + randomDouble(), + randomDouble(), + randomDouble(), + randomNonNegativeInt() + ); + return new TestCaseSupplier.TestCase( + List.of(TestCaseSupplier.TypedData.multiRow(List.of(value), DataType.AGGREGATE_METRIC_DOUBLE, "field")), + standardAggregatorName("Sum", DataType.AGGREGATE_METRIC_DOUBLE), + DataType.DOUBLE, + equalTo(value.sum()) + ); + + }) ) ); - if (includeDenseVector) { - suppliers.add(new TestCaseSupplier(List.of(DataType.AGGREGATE_METRIC_DOUBLE), () -> { - var value = new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral( - randomDouble(), - randomDouble(), - randomDouble(), - randomNonNegativeInt() - ); - return new TestCaseSupplier.TestCase( - List.of(TestCaseSupplier.TypedData.multiRow(List.of(value), DataType.AGGREGATE_METRIC_DOUBLE, "field")), - standardAggregatorName("Sum", DataType.AGGREGATE_METRIC_DOUBLE), - DataType.DOUBLE, - equalTo(value.sum()) - ); - - })); - } - return parameterSuppliersFromTypedDataWithDefaultChecks(suppliers); }