diff --git a/server/src/main/resources/transport/definitions/referable/esql_lookup_planning.csv b/server/src/main/resources/transport/definitions/referable/esql_lookup_planning.csv new file mode 100644 index 0000000000000..67b958b9af9f6 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_lookup_planning.csv @@ -0,0 +1 @@ +9305000 diff --git a/server/src/main/resources/transport/upper_bounds/9.4.csv b/server/src/main/resources/transport/upper_bounds/9.4.csv index 3d08c5fa7867e..6f5b0de2884aa 100644 --- a/server/src/main/resources/transport/upper_bounds/9.4.csv +++ b/server/src/main/resources/transport/upper_bounds/9.4.csv @@ -1 +1 @@ -inference_azure_openai_task_settings_headers,9304000 +esql_lookup_planning,9305000 diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperator.java index a70384749898f..34dcaa314117c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperator.java @@ -55,6 +55,7 @@ public final class LookupQueryOperator implements Operator { private final IndexSearcher searcher; private final Warnings warnings; private final int maxPageSize; + private final boolean emptyResult; private Page currentInputPage; private int queryPosition = -1; @@ -77,7 +78,8 @@ public LookupQueryOperator( IndexedByShardId shardContexts, int shardId, SearchExecutionContext searchExecutionContext, - Warnings warnings + Warnings warnings, + boolean emptyResult ) { this.blockFactory = blockFactory; this.maxPageSize = maxPageSize; @@ -86,6 +88,7 @@ public LookupQueryOperator( this.shardContext = shardContexts.get(shardId); this.shardContext.incRef(); this.searchExecutionContext = searchExecutionContext; + this.emptyResult = emptyResult; try { if (shardContext.searcher().getIndexReader() instanceof DirectoryReader directoryReader) { // This optimization is currently disabled for ParallelCompositeReader @@ -105,10 +108,14 @@ public void addInput(Page page) { if (currentInputPage != null) { throw new IllegalStateException("Operator already has input page, must consume it first"); } - currentInputPage = page; - queryPosition = -1; // Reset query position for new page pagesReceived++; rowsReceived += page.getPositionCount(); + if (emptyResult) { + page.releaseBlocks(); + return; + } + currentInputPage = page; + queryPosition = -1; // Reset query position for new page } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperatorTests.java index 423ea452926f4..11739323f1fae 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/lookup/LookupQueryOperatorTests.java @@ -124,7 +124,8 @@ public Operator get(DriverContext driverContext) { new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)), 0, directoryData.searchExecutionContext, - warnings() + warnings(), + false ); } @@ -180,7 +181,8 @@ public void testNoMatchesScenario() throws Exception { new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(noMatchDirectory.reader)), 0, noMatchDirectory.searchExecutionContext, - warnings() + warnings(), + false ) ) { // Create input with non-matching terms @@ -237,7 +239,8 @@ public void testGetOutputNeverNullWhileCanProduceMore() throws Exception { new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)), 0, directoryData.searchExecutionContext, - warnings() + warnings(), + false ) ) { // Create input with many matching terms @@ -283,7 +286,8 @@ public void testMixedMatchesAndNoMatches() throws Exception { new IndexedByShardIdFromSingleton<>(new LuceneSourceOperatorTests.MockShardContext(directoryData.reader)), 0, directoryData.searchExecutionContext, - warnings() + warnings(), + false ) ) { // Mix of matching and non-matching terms diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 5e07a2269668a..8127467f95a91 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -448,6 +448,7 @@ public enum Config { private final Map> includes = new HashMap<>(); private final Map> excludes = new HashMap<>(); + private final Map constantValues = new HashMap<>(); public TestConfigurableSearchStats include(Config key, String... fields) { // If this method is called with no fields, it is interpreted to mean include none, so we include a dummy field @@ -492,6 +493,16 @@ public boolean hasExactSubfield(FieldName field) { return isConfigationSet(Config.EXACT_SUBFIELD, field.string()); } + public TestConfigurableSearchStats withConstantValue(String field, String value) { + constantValues.put(field, value); + return this; + } + + @Override + public String constantValue(FieldName name) { + return constantValues.get(name.string()); + } + @Override public String toString() { return "TestConfigurableSearchStats{" + "includes=" + includes + ", excludes=" + excludes + '}'; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index ef298fd144bdf..8ff869299fac7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -52,6 +52,7 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.Literal; @@ -125,27 +126,19 @@ public void testJoinOnFourKeys() throws IOException { new String[] { "one", "two", "three", "four" }, new Integer[] { 1, 2, 3, 4 }, } ), - buildGreaterThanFilter(1L) + 1L ); } public void testLongKey() throws IOException { - runLookup( - List.of(DataType.LONG), - new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), - buildGreaterThanFilter(0L) - ); + runLookup(List.of(DataType.LONG), new UsingSingleLookupTable(new Object[][] { new Long[] { 12L, 33L, 1L } }), 0L); } /** * LOOKUP multiple results match. */ public void testLookupIndexMultiResults() throws IOException { - runLookup( - List.of(DataType.KEYWORD), - new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), - buildGreaterThanFilter(-1L) - ); + runLookup(List.of(DataType.KEYWORD), new UsingSingleLookupTable(new Object[][] { new String[] { "aa", "bb", "bb", "dd" } }), -1L); } public void testJoinOnTwoKeysMultiResults() throws IOException { @@ -234,19 +227,28 @@ public void populate(int docCount, List expected, Predicate fil } } - private PhysicalPlan buildGreaterThanFilter(long value) { - FieldAttribute filterAttribute = new FieldAttribute( + private static PhysicalPlan buildGreaterThanFilter(long value, FieldAttribute filterAttribute) { + Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG)); + EsRelation esRelation = new EsRelation( Source.EMPTY, - "l", - new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) + "test", + IndexMode.LOOKUP, + Map.of(), + Map.of(), + Map.of(), + List.of(filterAttribute) ); - Expression greaterThan = new GreaterThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.LONG)); - EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of()); Filter filter = new Filter(Source.EMPTY, esRelation, greaterThan); return new FragmentExec(filter); } - private void runLookup(List keyTypes, PopulateIndices populateIndices, PhysicalPlan pushedDownFilter) throws IOException { + private void runLookup(List keyTypes, PopulateIndices populateIndices, Long filterValue) throws IOException { + FieldAttribute lAttribute = new FieldAttribute( + Source.EMPTY, + "l", + new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) + ); + PhysicalPlan pushedDownFilter = filterValue != null ? buildGreaterThanFilter(filterValue, lAttribute) : null; String[] fieldMappers = new String[keyTypes.size() * 2]; for (int i = 0; i < keyTypes.size(); i++) { fieldMappers[2 * i] = "key" + i; @@ -401,19 +403,14 @@ private void runLookup(List keyTypes, PopulateIndices populateIndices, ctx -> internalCluster().getInstance(TransportEsqlQueryAction.class, finalNodeWithShard).getLookupFromIndexService(), "lookup", "lookup", - List.of( - new FieldAttribute( - Source.EMPTY, - "l", - new EsField("l", DataType.LONG, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) - ) - ), + List.of(lAttribute), Source.EMPTY, pushedDownFilter, Predicates.combineAnd(joinOnConditions), true, // useStreamingOperator QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY), - false // profile + false, // profile + EsqlTestUtils.TEST_CFG ); DriverContext driverContext = driverContext(); try ( diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinIT.java index 713655373a698..8a43a6796af9e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.logging.HeaderWarning; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin; @@ -58,6 +59,8 @@ import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; @ClusterScope(scope = SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) @TestLogging( @@ -449,13 +452,28 @@ public void testMultiValueJoinKeyWarnings() throws Exception { List warnings = capturedWarnings.get(); assertNotNull("Warnings should not be null", warnings); + List warningValues = warnings.stream().map(w -> HeaderWarning.extractWarningValueFromWarningHeader(w, false)).toList(); + // Filter warnings for the LOOKUP JOIN multi-value warning - List lookupJoinWarnings = warnings.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList(); + List lookupJoinWarnings = warningValues.stream().filter(w -> w.contains("LOOKUP JOIN encountered multi-value")).toList(); assertThat( - "Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warnings, + "Expected LOOKUP JOIN multi-value warning to be present. All warnings: " + warningValues, lookupJoinWarnings.size(), greaterThanOrEqualTo(1) ); + + // Verify the source location is correctly propagated (not Line -1:-1 / empty expression). + // The LOOKUP JOIN is on line 4, column 3 of the query (after "| "). + assertThat( + "Warning should contain correct source location, got: " + warningValues, + warningValues, + hasItem(startsWith("Line 4:3: evaluation of [LOOKUP JOIN languages_lookup ON language_code] failed")) + ); + assertThat( + "Warning should contain correct source location, got: " + warningValues, + warningValues, + hasItem(startsWith("Line 4:3: java.lang.IllegalArgumentException: LOOKUP JOIN encountered multi-value")) + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java index 890172942697f..53b74cef30144 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/ExpressionQueryList.java @@ -16,6 +16,7 @@ import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.QueryList; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.Rewriteable; @@ -29,9 +30,6 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison; import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.LucenePushdownPredicates; -import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; -import org.elasticsearch.xpack.esql.plan.physical.FilterExec; -import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.stats.SearchContextStats; @@ -54,7 +52,7 @@ * The query is then used to fetch the matching rows from the right dataset. * The class supports two types of joins: * 1. Field-based join: The join conditions are based on the equality of fields from the left and right datasets. - * It is used for field-based join when the join is on more than one field or there is a preJoinFilter + * It is used for field-based join when the join is on more than one field or there is a rightOnlyFilter * 2. Expression-based join: The join conditions are based on a complex expression that can involve multiple fields and operators. */ public class ExpressionQueryList implements LookupEnrichQueryGenerator { @@ -66,18 +64,22 @@ public class ExpressionQueryList implements LookupEnrichQueryGenerator { private ExpressionQueryList( List queryLists, SearchExecutionContext context, - PhysicalPlan rightPreJoinPlan, + @Nullable Expression rightOnlyFilter, + @Nullable QueryBuilder pushedQuery, ClusterService clusterService, AliasFilter aliasFilter ) { this.queryLists = new ArrayList<>(queryLists); this.aliasFilter = aliasFilter; this.clusterService = clusterService; + if (pushedQuery != null) { + lucenePushableFilterBuilders.add(pushedQuery); + } LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from( SearchContextStats.from(List.of(context)), new EsqlFlags(clusterService.getClusterSettings()) ); - buildPreJoinFilter(rightPreJoinPlan, context, lucenePushdownPredicates); + buildPreJoinFilter(rightOnlyFilter, context, lucenePushdownPredicates); } /** @@ -90,14 +92,15 @@ private ExpressionQueryList( public static ExpressionQueryList fieldBasedJoin( List queryLists, SearchExecutionContext context, - PhysicalPlan rightPreJoinPlan, + @Nullable Expression rightOnlyFilter, + @Nullable QueryBuilder pushedQuery, ClusterService clusterService, AliasFilter aliasFilter ) { - if (queryLists.size() < 2 && (rightPreJoinPlan instanceof FilterExec == false)) { + if (queryLists.size() < 2 && rightOnlyFilter == null && pushedQuery == null) { throw new IllegalArgumentException("ExpressionQueryList must have at least two QueryLists or a pre-join filter"); } - return new ExpressionQueryList(queryLists, context, rightPreJoinPlan, clusterService, aliasFilter); + return new ExpressionQueryList(queryLists, context, rightOnlyFilter, pushedQuery, clusterService, aliasFilter); } /** @@ -110,37 +113,33 @@ public static ExpressionQueryList fieldBasedJoin( */ public static ExpressionQueryList expressionBasedJoin( SearchExecutionContext context, - PhysicalPlan rightPreJoinPlan, + @Nullable Expression rightOnlyFilter, + @Nullable QueryBuilder pushedQuery, ClusterService clusterService, - LookupFromIndexService.TransportRequest request, + List matchFields, + Expression joinOnConditions, AliasFilter aliasFilter, Warnings warnings ) { if (LOOKUP_JOIN_ON_BOOLEAN_EXPRESSION.isEnabled() == false) { throw new UnsupportedOperationException("Lookup Join on Boolean Expression capability is not enabled"); } - if (request.getJoinOnConditions() == null) { + if (joinOnConditions == null) { throw new IllegalStateException("expressionBasedJoin must have join conditions"); } ExpressionQueryList expressionQueryList = new ExpressionQueryList( new ArrayList<>(), context, - rightPreJoinPlan, + rightOnlyFilter, + pushedQuery, clusterService, aliasFilter ); - // Build join-on conditions using the context from planning (this is safe as conditions are static) LucenePushdownPredicates lucenePushdownPredicates = LucenePushdownPredicates.from( SearchContextStats.from(List.of(context)), new EsqlFlags(clusterService.getClusterSettings()) ); - expressionQueryList.buildJoinOnForExpressionJoin( - request.getJoinOnConditions(), - request.getMatchFields(), - context, - lucenePushdownPredicates, - warnings - ); + expressionQueryList.buildJoinOnForExpressionJoin(joinOnConditions, matchFields, context, lucenePushdownPredicates, warnings); return expressionQueryList; } @@ -242,37 +241,33 @@ private void addToLucenePushableFilters(QueryBuilder queryBuilder) { } private void buildPreJoinFilter( - PhysicalPlan rightPreJoinPlan, + @Nullable Expression rightOnlyFilter, SearchExecutionContext context, LucenePushdownPredicates lucenePushdownPredicates ) { - if (rightPreJoinPlan instanceof FilterExec filterExec) { - List candidateRightHandFilters = Predicates.splitAnd(filterExec.condition()); - for (Expression filter : candidateRightHandFilters) { - if (filter instanceof TranslationAware translationAware) { - if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { - QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder(); - // Rewrite the query builder to ensure doIndexMetadataRewrite is called - // Some functions, such as KQL require rewriting to work properly - try { - queryBuilder = Rewriteable.rewrite(queryBuilder, context, true); - } catch (IOException e) { - throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e); - } - // Store QueryBuilder instead of Query to avoid caching IndexReader references - addToLucenePushableFilters(queryBuilder); + if (rightOnlyFilter == null) { + return; + } + List candidateRightHandFilters = Predicates.splitAnd(rightOnlyFilter); + for (Expression filter : candidateRightHandFilters) { + if (filter instanceof TranslationAware translationAware) { + if (TranslationAware.Translatable.YES.equals(translationAware.translatable(lucenePushdownPredicates))) { + QueryBuilder queryBuilder = translationAware.asQuery(lucenePushdownPredicates, TRANSLATOR_HANDLER).toQueryBuilder(); + // Rewrite the query builder to ensure doIndexMetadataRewrite is called + // Some functions, such as KQL require rewriting to work properly + try { + queryBuilder = Rewriteable.rewrite(queryBuilder, context, true); + } catch (IOException e) { + throw new UncheckedIOException("Error while rewriting query for Lucene pushable filter", e); } + // Store QueryBuilder instead of Query to avoid caching IndexReader references + addToLucenePushableFilters(queryBuilder); } - // If the filter is not translatable we will not apply it for now - // as performance testing showed no performance improvement. - // We can revisit this in the future if needed, once we have more optimized workflow in place. - // The filter is optional, so it is OK to ignore it if it cannot be translated. } - } else if (rightPreJoinPlan != null && rightPreJoinPlan instanceof EsSourceExec == false) { - throw new IllegalStateException( - "The right side of a LookupJoinExec can only be a FilterExec on top of an EsSourceExec or an EsSourceExec, but got: " - + rightPreJoinPlan - ); + // If the filter is not translatable we will not apply it for now + // as performance testing showed no performance improvement. + // We can revisit this in the future if needed, once we have more optimized workflow in place. + // The filter is optional, so it is OK to ignore it if it cannot be translated. } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java index af6717ef79783..3e099733841f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlanner.java @@ -18,28 +18,38 @@ import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.EvalOperator.EvalOperatorFactory; +import org.elasticsearch.compute.operator.FilterOperator; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.Operator.OperatorFactory; import org.elasticsearch.compute.operator.OutputOperator.CollectedPagesProvider; import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory; -import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory; import org.elasticsearch.compute.operator.Warnings; import org.elasticsearch.compute.operator.lookup.BlockOptimization; -import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator; import org.elasticsearch.compute.operator.lookup.LookupEnrichQueryGenerator; import org.elasticsearch.compute.operator.lookup.LookupQueryOperator; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.Rewriteable; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; +import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.evaluator.EvalMapper; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; import org.elasticsearch.xpack.esql.plan.physical.ParameterizedQueryExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; @@ -53,6 +63,8 @@ import org.elasticsearch.xpack.esql.planner.PlannerUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -65,13 +77,16 @@ */ public class LookupExecutionPlanner { /** - * Functional interface for creating a LookupEnrichQueryGenerator from the necessary parameters. + * Functional interface for creating a {@link LookupEnrichQueryGenerator} from plan data. + * Matches the signature of {@code LookupFromIndexService.queryListFromPlan}. */ @FunctionalInterface - public interface QueryListFactory { + public interface QueryListFromPlanFactory { LookupEnrichQueryGenerator create( - AbstractLookupService.TransportRequest request, - SearchExecutionContext searchExecutionContext, + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable QueryBuilder pushedQuery, + SearchExecutionContext context, AliasFilter aliasFilter, Warnings warnings ); @@ -88,9 +103,7 @@ static class LookupDriverContext extends DriverContext implements CollectedPages private final AbstractLookupService.LookupShardContext lookupShardContext; private final List collectedPages; private final Page inputPage; - private final AbstractLookupService.TransportRequest request; private final AliasFilter aliasFilter; - private final QueryListFactory queryListFactory; LookupDriverContext( BigArrays bigArrays, @@ -99,9 +112,7 @@ static class LookupDriverContext extends DriverContext implements CollectedPages AbstractLookupService.LookupShardContext lookupShardContext, List collectedPages, Page inputPage, - AbstractLookupService.TransportRequest request, AliasFilter aliasFilter, - QueryListFactory queryListFactory, LocalCircuitBreaker.SizeSettings localBreakerSettings ) { super(bigArrays, blockFactory, localBreakerSettings); @@ -109,9 +120,7 @@ static class LookupDriverContext extends DriverContext implements CollectedPages this.lookupShardContext = lookupShardContext; this.collectedPages = collectedPages; this.inputPage = inputPage; - this.request = request; this.aliasFilter = aliasFilter; - this.queryListFactory = queryListFactory; } ShardContext shardContext() { @@ -138,17 +147,9 @@ Page inputPage() { return inputPage; } - AbstractLookupService.TransportRequest request() { - return request; - } - AliasFilter aliasFilter() { return aliasFilter; } - - QueryListFactory queryListFactory() { - return queryListFactory; - } } private final BlockFactory blockFactory; @@ -163,14 +164,27 @@ public LookupExecutionPlanner(BlockFactory blockFactory, BigArrays bigArrays, Lo /** * Creates a PhysicalOperation with operator factories, matching LocalExecutionPlanner's pattern. + * @param lookupSource the Source to use for warning messages (from the original LOOKUP JOIN expression), + * passed separately because plan nodes conventionally discard Source during serialization. */ public PhysicalOperation buildOperatorFactories( PlannerSettings plannerSettings, PhysicalPlan physicalPlan, BlockOptimization blockOptimization, - SourceOperatorFactory sourceFactory + SourceOperatorFactory sourceFactory, + FoldContext foldCtx, + QueryListFromPlanFactory queryListFromPlanFactory, + Source lookupSource ) { - return planLookupNode(plannerSettings, physicalPlan, blockOptimization, sourceFactory); + return planLookupNode( + plannerSettings, + physicalPlan, + blockOptimization, + sourceFactory, + foldCtx, + queryListFromPlanFactory, + lookupSource + ); } /** @@ -181,9 +195,8 @@ public LookupFromIndexService.LookupQueryPlan buildOperators( PhysicalOperation physicalOperation, AbstractLookupService.LookupShardContext shardContext, List releasables, - AbstractLookupService.TransportRequest request, - AliasFilter aliasFilter, - QueryListFactory queryListFactory + Page inputPage, + AliasFilter aliasFilter ) { final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker( @@ -202,10 +215,8 @@ public LookupFromIndexService.LookupQueryPlan buildOperators( shardContext.context(), shardContext, collectedPages, - request.inputPage, - request, + inputPage, aliasFilter, - queryListFactory, localBreakerSettings ); @@ -231,11 +242,22 @@ private PhysicalOperation planLookupNode( PlannerSettings plannerSettings, PhysicalPlan node, BlockOptimization optimizationState, - SourceOperatorFactory sourceFactory + SourceOperatorFactory sourceFactory, + FoldContext foldCtx, + QueryListFromPlanFactory queryListFromPlanFactory, + Source lookupSource ) { PhysicalOperation source; if (node instanceof UnaryExec unaryExec) { - source = planLookupNode(plannerSettings, unaryExec.child(), optimizationState, sourceFactory); + source = planLookupNode( + plannerSettings, + unaryExec.child(), + optimizationState, + sourceFactory, + foldCtx, + queryListFromPlanFactory, + lookupSource + ); } else { // there could be a leaf node such as ParameterizedQueryExec source = null; @@ -243,9 +265,19 @@ private PhysicalOperation planLookupNode( // Plan this node based on its type if (node instanceof ParameterizedQueryExec parameterizedQueryExec) { - return planParameterizedQueryExec(parameterizedQueryExec, optimizationState, sourceFactory); + return planParameterizedQueryExec( + parameterizedQueryExec, + optimizationState, + sourceFactory, + queryListFromPlanFactory, + lookupSource + ); } else if (node instanceof FieldExtractExec fieldExtractExec) { return planFieldExtractExec(plannerSettings, fieldExtractExec, source); + } else if (node instanceof EvalExec evalExec) { + return planEvalExec(evalExec, source, foldCtx); + } else if (node instanceof FilterExec filterExec) { + return planFilterExec(filterExec, source, foldCtx); } else if (node instanceof ProjectExec projectExec) { return planProjectExec(projectExec, source); } else if (node instanceof OutputExec outputExec) { @@ -258,7 +290,9 @@ private PhysicalOperation planLookupNode( private PhysicalOperation planParameterizedQueryExec( ParameterizedQueryExec parameterizedQueryExec, BlockOptimization optimizationState, - SourceOperatorFactory sourceFactory + SourceOperatorFactory sourceFactory, + QueryListFromPlanFactory queryListFromPlanFactory, + Source lookupSource ) { Layout.Builder layoutBuilder = new Layout.Builder(); List output = parameterizedQueryExec.output(); @@ -267,15 +301,18 @@ private PhysicalOperation planParameterizedQueryExec( } Layout layout = layoutBuilder.build(); - // Create intermediate operator factory for LookupQueryOperator - // This operator receives pages from ExchangeSourceOperator and generates queries OperatorFactory enrichQueryFactory = new LookupQueryOperatorFactory( LookupQueryOperator.DEFAULT_MAX_PAGE_SIZE, optimizationState, - 0 + 0, + parameterizedQueryExec.matchFields(), + parameterizedQueryExec.joinOnConditions(), + parameterizedQueryExec.query(), + lookupSource, + queryListFromPlanFactory, + parameterizedQueryExec.emptyResult() ); - // Use the actual source factory from BidirectionalBatchExchangeServer return PhysicalOperation.fromSource(sourceFactory, layout).with(enrichQueryFactory, layout); } @@ -375,6 +412,23 @@ public String describe() { }, layout); } + private PhysicalOperation planEvalExec(EvalExec evalExec, PhysicalOperation source, FoldContext foldCtx) { + for (Alias field : evalExec.fields()) { + var evaluatorSupplier = EvalMapper.toEvaluator(foldCtx, field.child(), source.layout()); + Layout.Builder layout = source.layout().builder(); + layout.append(field.toAttribute()); + source = source.with(new EvalOperatorFactory(evaluatorSupplier), layout.build()); + } + return source; + } + + private PhysicalOperation planFilterExec(FilterExec filterExec, PhysicalOperation source, FoldContext foldCtx) { + return source.with( + new FilterOperator.FilterOperatorFactory(EvalMapper.toEvaluator(foldCtx, filterExec.condition(), source.layout())), + source.layout() + ); + } + private PhysicalOperation planProjectExec(ProjectExec projectExec, PhysicalOperation source) { return LocalExecutionPlanner.planProject(projectExec, source); } @@ -388,64 +442,39 @@ private PhysicalOperation planOutputExec(OutputExec outputExec, PhysicalOperatio return source.withSink(new OutputOperatorFactory(Expressions.names(output), Function.identity(), page -> {}), source.layout()); } - private record EnrichQuerySourceOperatorFactory(int maxPageSize, BlockOptimization blockOptimization, int shardId) - implements - SourceOperatorFactory { - @Override - public SourceOperator get(DriverContext driverContext) { - // In lookup execution path, driverContext is always LookupDriverContext - LookupDriverContext lookupDriverContext = (LookupDriverContext) driverContext; - ShardContext shardContext = lookupDriverContext.shardContext(); - SearchExecutionContext searchExecutionContext = lookupDriverContext.searchExecutionContext(); - Page inputPage = lookupDriverContext.inputPage(); - IndexedByShardId shardContexts = new IndexedByShardIdFromSingleton<>(shardContext, shardId); - - Warnings warnings = Warnings.createWarnings(driverContext.warningsMode(), lookupDriverContext.request().source); - - LookupEnrichQueryGenerator queryList = lookupDriverContext.queryListFactory() - .create(lookupDriverContext.request(), searchExecutionContext, lookupDriverContext.aliasFilter(), warnings); - - return new EnrichQuerySourceOperator( - driverContext.blockFactory(), - maxPageSize, - queryList, - inputPage, - blockOptimization, - shardContexts, - shardId, - searchExecutionContext, - warnings - ); - } - - @Override - public String describe() { - return "EnrichQuerySourceOperator[maxPageSize=" + maxPageSize + "]"; - } - } - /** * Factory for LookupQueryOperator. * Creates an intermediate operator that processes match field pages from ExchangeSourceOperator * and generates queries to lookup document IDs. */ - private record LookupQueryOperatorFactory(int maxPageSize, BlockOptimization blockOptimization, int shardId) - implements - OperatorFactory { + private record LookupQueryOperatorFactory( + int maxPageSize, + BlockOptimization blockOptimization, + int shardId, + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable QueryBuilder query, + Source planSource, + QueryListFromPlanFactory queryListFromPlanFactory, + boolean emptyResult + ) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - // In lookup execution path, driverContext is always LookupDriverContext LookupDriverContext lookupDriverContext = (LookupDriverContext) driverContext; ShardContext shardContext = lookupDriverContext.shardContext(); SearchExecutionContext searchExecutionContext = lookupDriverContext.searchExecutionContext(); IndexedByShardId shardContexts = new IndexedByShardIdFromSingleton<>(shardContext, shardId); - // Create warnings here when creating the operator from the factory - Warnings warnings = Warnings.createWarnings(DriverContext.WarningsMode.COLLECT, lookupDriverContext.request().source); - - // Create queryList when creating the operator from the factory - LookupEnrichQueryGenerator queryList = lookupDriverContext.queryListFactory() - .create(lookupDriverContext.request(), searchExecutionContext, lookupDriverContext.aliasFilter(), warnings); + Warnings warnings = Warnings.createWarnings(DriverContext.WarningsMode.COLLECT, planSource); + QueryBuilder rewrittenQuery = rewriteQuery(query, searchExecutionContext); + LookupEnrichQueryGenerator queryList = queryListFromPlanFactory.create( + matchFields, + joinOnConditions, + rewrittenQuery, + searchExecutionContext, + lookupDriverContext.aliasFilter(), + warnings + ); return new LookupQueryOperator( driverContext.blockFactory(), @@ -454,13 +483,25 @@ public Operator get(DriverContext driverContext) { shardContexts, shardId, searchExecutionContext, - warnings + warnings, + emptyResult ); } @Override public String describe() { - return "LookupQueryOperator[maxPageSize=" + maxPageSize + "]"; + return "LookupQueryOperator[maxPageSize=" + maxPageSize + ", emptyResult=" + emptyResult + "]"; + } + } + + private static QueryBuilder rewriteQuery(@Nullable QueryBuilder query, SearchExecutionContext searchExecutionContext) { + if (query == null) { + return null; + } + try { + return Rewriteable.rewrite(query, searchExecutionContext, true); + } catch (IOException e) { + throw new UncheckedIOException("Error while rewriting pushed query for lookup", e); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java index 014c96292fe45..eb1b1b7faca33 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperator.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.operator.IsBlockedResult; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.operator.lookup.RightChunkedLeftJoin; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.tasks.CancellableTask; @@ -28,6 +29,7 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.session.Configuration; import java.io.IOException; import java.util.ArrayList; @@ -58,9 +60,16 @@ public record Factory( Expression joinOnConditions, boolean useStreamingOperator, int exchangeBufferSize, - boolean profile + boolean profile, + @Nullable Configuration configuration ) implements OperatorFactory { + public Factory { + if (useStreamingOperator && configuration == null) { + throw new IllegalArgumentException("configuration is required for streaming lookup operator"); + } + } + private String operatorName() { return useStreamingOperator ? "StreamingLookupOperator" : "LookupOperator"; } @@ -100,7 +109,8 @@ public Operator get(DriverContext driverContext) { rightPreJoinPlan, joinOnConditions, exchangeBufferSize, - profile + profile, + configuration ); } else { return new LookupFromIndexOperator( @@ -213,7 +223,8 @@ protected void performAsync(Page inputPage, ActionListener listener joinOnConditions, null, // clientToServerId - set only by StreamingLookupFromIndexOperator null, // serverToClientId - set only by StreamingLookupFromIndexOperator - false // profile - non-streaming lookup doesn't support plan output + false, // profile - non-streaming lookup doesn't support plan output + null // configuration - non-streaming lookup doesn't use planning pipeline ); lookupService.lookupAsync(request, parentTask, listener.map(response -> { List pages = response.takePages(); @@ -386,7 +397,7 @@ public long emittedPages() { } public long emittedRows() { - return emittedPages; + return emittedRows; } public long totalRows() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index cf44eceb2f29d..a946d00ede1e2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -37,6 +37,7 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -49,11 +50,22 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.expression.predicate.Predicates; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput; +import org.elasticsearch.xpack.esql.optimizer.LocalLogicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; +import org.elasticsearch.xpack.esql.optimizer.LookupLogicalOptimizer; +import org.elasticsearch.xpack.esql.optimizer.LookupPhysicalPlanOptimizer; +import org.elasticsearch.xpack.esql.plan.logical.EsRelation; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; +import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; @@ -65,12 +77,18 @@ import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner; import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.stats.SearchContextStats; +import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** @@ -153,7 +171,8 @@ protected TransportRequest transportRequest(LookupFromIndexService.Request reque request.joinOnConditions, request.clientToServerId, request.serverToClientId, - request.profile + request.profile, + request.configuration ); } @@ -164,38 +183,67 @@ protected LookupEnrichQueryGenerator queryList( AliasFilter aliasFilter, Warnings warnings ) { - PhysicalPlan lookupNodePlan = localLookupNodePlanning(request.rightPreJoinPlan); - if (request.joinOnConditions == null) { - // this is a field based join + PhysicalPlan lookupNodePlan = mapFragmentToPhysical(request.rightPreJoinPlan); + Expression rightOnlyFilter = lookupNodePlan instanceof FilterExec filterExec ? filterExec.condition() : null; + return buildQueryGenerator(request.matchFields, request.joinOnConditions, rightOnlyFilter, null, context, aliasFilter, warnings); + } + + private LookupEnrichQueryGenerator buildQueryGenerator( + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable Expression rightOnlyFilter, + @Nullable QueryBuilder pushedQuery, + SearchExecutionContext context, + AliasFilter aliasFilter, + Warnings warnings + ) { + if (joinOnConditions == null) { List queryLists = new ArrayList<>(); - for (int i = 0; i < request.matchFields.size(); i++) { - MatchConfig matchField = request.matchFields.get(i); + for (int i = 0; i < matchFields.size(); i++) { + MatchConfig matchField = matchFields.get(i); int channelOffset = matchField.channel(); QueryList q = termQueryList(context.getFieldType(matchField.fieldName()), aliasFilter, channelOffset, matchField.type()); queryLists.add(q.onlySingleValues(warnings, "LOOKUP JOIN encountered multi-value")); } - if (queryLists.size() == 1 && lookupNodePlan instanceof FilterExec == false) { + if (queryLists.size() == 1 && rightOnlyFilter == null && pushedQuery == null) { return queryLists.getFirst(); } - return ExpressionQueryList.fieldBasedJoin(queryLists, context, lookupNodePlan, clusterService, aliasFilter); + return ExpressionQueryList.fieldBasedJoin(queryLists, context, rightOnlyFilter, pushedQuery, clusterService, aliasFilter); } else { - // this is an expression based join - return ExpressionQueryList.expressionBasedJoin(context, lookupNodePlan, clusterService, request, aliasFilter, warnings); + return ExpressionQueryList.expressionBasedJoin( + context, + rightOnlyFilter, + pushedQuery, + clusterService, + matchFields, + joinOnConditions, + aliasFilter, + warnings + ); } + } + /** + * Builds a query list for the streaming lookup path using data from the physical plan tree + * rather than from the transport request. + */ + protected LookupEnrichQueryGenerator queryListFromPlan( + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable QueryBuilder pushedQuery, + SearchExecutionContext context, + AliasFilter aliasFilter, + Warnings warnings + ) { + return buildQueryGenerator(matchFields, joinOnConditions, null, pushedQuery, context, aliasFilter, warnings); } /** - * This function will perform any planning needed on the local node - * For now, we will just do mapping of the logical plan to physical plan - * In the future we can also do local physical and logical optimizations. - * We only support a FragmentExec node containing a logical plan or a null plan - * If any other plan is sent we will just return null. This can happen in cases - * where the coordinator is running an older version that does not support - * keeping the plan as Logical Plan inside FragmentExec yet - * In those cases, it is safe to ignore the plan sent and return null + * Maps the logical plan inside a {@link FragmentExec} to a physical plan. + * Returns {@code null} if the input is not a {@link FragmentExec} (e.g. older coordinator + * that does not wrap the plan in a {@link FragmentExec}). */ - private static PhysicalPlan localLookupNodePlanning(PhysicalPlan physicalPlan) { + private static PhysicalPlan mapFragmentToPhysical(@Nullable PhysicalPlan physicalPlan) { return physicalPlan instanceof FragmentExec fragmentExec ? LocalMapper.INSTANCE.map(fragmentExec.fragment()) : null; } @@ -216,6 +264,8 @@ public static class Request extends AbstractLookupService.Request { private final String clientToServerId; private final String serverToClientId; private final boolean profile; + @Nullable + private final Configuration configuration; Request( String sessionId, @@ -229,7 +279,8 @@ public static class Request extends AbstractLookupService.Request { Expression joinOnConditions, String clientToServerId, String serverToClientId, - boolean profile + boolean profile, + @Nullable Configuration configuration ) { super(sessionId, index, indexPattern, matchFields.get(0).type(), inputPage, extractFields, source); this.matchFields = matchFields; @@ -238,6 +289,7 @@ public static class Request extends AbstractLookupService.Request { this.clientToServerId = clientToServerId; this.serverToClientId = serverToClientId; this.profile = profile; + this.configuration = configuration; } } @@ -249,6 +301,7 @@ protected static class TransportRequest extends AbstractLookupService.TransportR ); private static final TransportVersion ESQL_LOOKUP_JOIN_ON_EXPRESSION = TransportVersion.fromName("esql_lookup_join_on_expression"); private static final TransportVersion ESQL_STREAMING_LOOKUP_JOIN = TransportVersion.fromName("esql_streaming_lookup_join"); + private static final TransportVersion ESQL_LOOKUP_PLANNING = TransportVersion.fromName("esql_lookup_planning"); private final List matchFields; private final PhysicalPlan rightPreJoinPlan; @@ -256,9 +309,9 @@ protected static class TransportRequest extends AbstractLookupService.TransportR private final String clientToServerId; private final String serverToClientId; private final boolean profile; + @Nullable + private final Configuration configuration; - // Right now we assume that the page contains the same number of blocks as matchFields and that the blocks are in the same order - // The channel information inside the MatchConfig, should say the same thing TransportRequest( String sessionId, ShardId shardId, @@ -272,7 +325,8 @@ protected static class TransportRequest extends AbstractLookupService.TransportR Expression joinOnConditions, String clientToServerId, String serverToClientId, - boolean profile + boolean profile, + @Nullable Configuration configuration ) { super(sessionId, shardId, indexPattern, inputPage, toRelease, extractFields, source); this.matchFields = matchFields; @@ -281,6 +335,7 @@ protected static class TransportRequest extends AbstractLookupService.TransportR this.clientToServerId = clientToServerId; this.serverToClientId = serverToClientId; this.profile = profile; + this.configuration = configuration; } static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) throws IOException { @@ -304,7 +359,12 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro try (BlockStreamInput bsi = new BlockStreamInput(in, blockFactory)) { inputPage = new Page(bsi); } - PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null); + // configuration is needed for the PlanStreamInput, read it here first + Configuration configuration = null; + if (in.getTransportVersion().supports(ESQL_LOOKUP_PLANNING)) { + configuration = in.readOptionalWriteable(Configuration::readWithoutTables); + } + PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), configuration); List extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class); List matchFields = null; if (in.getTransportVersion().supports(ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { @@ -316,8 +376,6 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro matchFields.add(new MatchConfig(matchField, 0, inputDataType)); } var source = Source.readFrom(planIn); - // Source.readFrom() requires the query from the Configuration passed to PlanStreamInput. - // As we don't have the Configuration here, and it may be heavy to serialize, we directly pass the Source text. if (in.getTransportVersion().supports(ESQL_LOOKUP_JOIN_SOURCE_TEXT)) { String sourceText = in.readString(); source = new Source(source.source(), sourceText); @@ -351,7 +409,8 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro joinOnConditions, clientToServerId, serverToClientId, - profile + profile, + configuration ); result.setParentTask(parentTaskId); return result; @@ -377,6 +436,11 @@ public boolean isProfile() { return profile; } + @Nullable + public Configuration getConfiguration() { + return configuration; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -397,7 +461,11 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(matchFields.get(0).type().typeName()); } out.writeWriteable(inputPage); - PlanStreamOutput planOut = new PlanStreamOutput(out, null); + // configuration is needed for the PlanStreamOutput, write it here early + if (out.getTransportVersion().supports(ESQL_LOOKUP_PLANNING)) { + out.writeOptionalWriteable(configuration != null ? configuration.withoutTables() : null); + } + PlanStreamOutput planOut = new PlanStreamOutput(out, configuration); planOut.writeNamedWriteableCollection(extractFields); if (out.getTransportVersion().supports(ESQL_LOOKUP_JOIN_ON_MANY_FIELDS)) { // serialize all match fields for new versions @@ -580,24 +648,43 @@ protected void doLookupStreaming( // Get source factory from server for planning ExchangeSourceOperator.ExchangeSourceOperatorFactory sourceFactory = server.getSourceOperatorFactory(); - PhysicalPlan physicalPlan = createLookupPhysicalPlan(request); + Configuration configuration = request.configuration; + FoldContext foldCtx = configuration != null ? configuration.newFoldContext() : FoldContext.small(); + SearchStats searchStats = SearchContextStats.from(List.of(shardContext.executionContext())); + EsqlFlags flags = new EsqlFlags(clusterService.getClusterSettings()); + + PhysicalPlan physicalPlan; + LookupExecutionPlanner.QueryListFromPlanFactory queryListFactory; + if (configuration != null) { + LogicalPlan logicalPlan = extractOrBuildLogicalPlan(request); + physicalPlan = createLookupPhysicalPlan(logicalPlan, configuration, plannerSettings, foldCtx, searchStats, flags); + queryListFactory = this::queryListFromPlan; + } else { + // BWC: old data node without Configuration + // Do not do logical and physical planning (It will fail without configuration) + // We just build the physical plan directly using the legacy code + physicalPlan = createLegacyLookupPhysicalPlan(request); + queryListFactory = (mf, joc, pq, ctx, af, w) -> queryList(request, ctx, af, w); + } String planString = request.isProfile() ? physicalPlan.toString() : null; - // Build operators using the planning system with the actual source factory + // Build operators using the planning system with the actual source factory. LocalExecutionPlanner.PhysicalOperation physicalOperation = executionPlanner.buildOperatorFactories( plannerSettings, physicalPlan, BlockOptimization.NONE, - sourceFactory + sourceFactory, + foldCtx, + queryListFactory, + request.source ); LookupQueryPlan lookupQueryPlan = executionPlanner.buildOperators( physicalOperation, shardContext, releasables, - request, - aliasFilter, - (req, context, filter, warn) -> queryList((TransportRequest) req, context, filter, warn) + request.inputPage, + aliasFilter ); // Wrap releasables (shardContext and localBreaker) - they're already in releasables list @@ -674,11 +761,156 @@ protected void startServerWithOperators( } /** - * Creates a PhysicalPlan tree representing the lookup operation structure. - * This plan can be cached and reused across multiple calls with different input data. + * If the data node already called {@link #buildLocalLogicalPlan} (new path), the logical plan + * inside {@code rightPreJoinPlan} will contain a {@link ParameterizedQuery} and can be used directly. + * Otherwise (BWC with older data nodes), we build it here on the lookup node. */ - protected PhysicalPlan createLookupPhysicalPlan(TransportRequest request) throws IOException { - // Create output attributes: doc block + private static LogicalPlan extractOrBuildLogicalPlan(TransportRequest request) { + if (request.rightPreJoinPlan instanceof FragmentExec fragmentExec) { + LogicalPlan fragment = fragmentExec.fragment(); + if (fragment.anyMatch(p -> p instanceof ParameterizedQuery)) { + return fragment; + } + } + // bwc path, make sure rolling updates still work + return buildLocalLogicalPlan( + request.source, + request.matchFields, + request.joinOnConditions, + request.rightPreJoinPlan, + request.extractFields + ); + } + + /** + * Builds a logical plan for the lookup node from the request. + * Walks the logical plan tree inside {@code rightPreJoinPlan}, preserving all nodes (Filter, etc.), + * and replaces the {@link EsRelation} leaf with a {@link ParameterizedQuery}. + * Then adds a {@link Project} on top for [{@code _positions}, {@code extractFields}]. + */ + public static LogicalPlan buildLocalLogicalPlan( + Source source, + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable PhysicalPlan rightPreJoinPlan, + List extractFields + ) { + FieldAttribute docAttribute = new FieldAttribute(source, null, null, EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD); + + List leftRightParts = new ArrayList<>(); + List rightOnlyFromConditions = new ArrayList<>(); + if (joinOnConditions != null) { + Set leftFieldNames = matchFields.stream().map(MatchConfig::fieldName).collect(Collectors.toSet()); + for (Expression expr : Predicates.splitAnd(joinOnConditions)) { + boolean referencesLeft = expr.references().stream().anyMatch(attr -> leftFieldNames.contains(attr.name())); + if (referencesLeft) { + leftRightParts.add(expr); + } else { + rightOnlyFromConditions.add(expr); + } + } + } + Expression finalJoinOnConditions = Predicates.combineAnd(leftRightParts); + + LogicalPlan plan; + if (rightPreJoinPlan == null) { + List paramQueryOutput = buildParameterizedQueryOutput(docAttribute, extractFields, rightOnlyFromConditions, null); + plan = new ParameterizedQuery(source, paramQueryOutput, matchFields, finalJoinOnConditions); + } else if (rightPreJoinPlan instanceof FragmentExec fragmentExec) { + plan = fragmentExec.fragment(); + plan = plan.transformUp(EsRelation.class, esRelation -> { + List paramQueryOutput = buildParameterizedQueryOutput( + docAttribute, + extractFields, + rightOnlyFromConditions, + esRelation.output() + ); + return new ParameterizedQuery(source, paramQueryOutput, matchFields, finalJoinOnConditions); + }); + } else { + throw new EsqlIllegalArgumentException( + "Expected FragmentExec or null but got [{}]", + rightPreJoinPlan.getClass().getSimpleName() + ); + } + + if (rightOnlyFromConditions.isEmpty() == false) { + if (plan instanceof Filter existingFilter) { + rightOnlyFromConditions.add(existingFilter.condition()); + plan = existingFilter.child(); + } + plan = new Filter(source, plan, Predicates.combineAnd(rightOnlyFromConditions)); + } + + List projections = new ArrayList<>(extractFields.size() + 1); + projections.add(AbstractLookupService.LOOKUP_POSITIONS_FIELD); + projections.addAll(extractFields); + return new Project(source, plan, projections); + } + + /** + * Builds the output attributes for a {@link ParameterizedQuery}, mirroring how {@link EsRelation} + * exposes all index fields. This ensures the logical verifier can validate that all field references + * in the plan are satisfied. At the physical level, + * {@link org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes ReplaceSourceAttributes} + * strips the output back down to just {@code [_doc, _positions]}, and {@code InsertFieldExtraction} + * adds the needed fields back — the same pattern used for {@code EsRelation}. + */ + private static List buildParameterizedQueryOutput( + FieldAttribute docAttribute, + List extractFields, + List rightOnlyConditions, + @Nullable List esRelationOutput + ) { + LinkedHashSet output = new LinkedHashSet<>(); + output.add(docAttribute); + output.add(AbstractLookupService.LOOKUP_POSITIONS_FIELD); + if (esRelationOutput != null) { + output.addAll(esRelationOutput); + } else { + for (NamedExpression field : extractFields) { + if (field instanceof Attribute attr) { + output.add(attr); + } + } + for (Expression condition : rightOnlyConditions) { + condition.forEachDown(FieldAttribute.class, output::add); + } + } + return new ArrayList<>(output); + } + + /** + * Builds the physical plan for the lookup node by running: + * LookupLogicalOptimizer -> LocalMapper.map -> LookupPhysicalPlanOptimizer. + * The caller is responsible for building the logical plan via {@link #buildLocalLogicalPlan}. + */ + public static PhysicalPlan createLookupPhysicalPlan( + LogicalPlan logicalPlan, + @Nullable Configuration configuration, + PlannerSettings plannerSettings, + FoldContext foldCtx, + SearchStats searchStats, + EsqlFlags flags + ) { + LogicalPlan optimizedLogical = new LookupLogicalOptimizer(new LocalLogicalOptimizerContext(configuration, foldCtx, searchStats)) + .localOptimize(logicalPlan); + PhysicalPlan physicalPlan = LocalMapper.INSTANCE.map(optimizedLogical); + LocalPhysicalOptimizerContext context = new LocalPhysicalOptimizerContext( + plannerSettings, + flags, + configuration, + foldCtx, + searchStats + ); + return new LookupPhysicalPlanOptimizer(context).optimize(physicalPlan); + } + + /** + * BWC: builds a flat physical plan directly (no logical planning or optimization). + * Used when the data node does not send a {@link Configuration}. + */ + private static PhysicalPlan createLegacyLookupPhysicalPlan(TransportRequest request) { FieldAttribute docAttribute = new FieldAttribute( request.source, null, @@ -686,17 +918,10 @@ protected PhysicalPlan createLookupPhysicalPlan(TransportRequest request) throws EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD ); - List sourceOutput = new ArrayList<>(); - sourceOutput.add(docAttribute); + List sourceOutput = List.of(docAttribute, AbstractLookupService.LOOKUP_POSITIONS_FIELD); - // Use the reference attribute directly - sourceOutput.add(AbstractLookupService.LOOKUP_POSITIONS_FIELD); + PhysicalPlan plan = new ParameterizedQueryExec(request.source, sourceOutput, request.matchFields, request.joinOnConditions, null); - ParameterizedQueryExec source = new ParameterizedQueryExec(request.source, sourceOutput); - - PhysicalPlan plan = source; - - // Add FieldExtractExec if we have extract fields if (request.extractFields.isEmpty() == false) { List extractAttributes = request.extractFields.stream() .map(LookupFromIndexService::getExtractFieldAttribute) @@ -706,28 +931,13 @@ protected PhysicalPlan createLookupPhysicalPlan(TransportRequest request) throws List childOutput = plan.output(); List projections = new ArrayList<>(childOutput.size() - 1); - // Skip index 0 (doc), keep indices 1+ (positions + extract fields) for (int i = 1; i < childOutput.size(); i++) { projections.add(childOutput.get(i)); } plan = new ProjectExec(request.source, plan, projections); - - plan = new OutputExec(request.source, plan, null); - - return plan; + return new OutputExec(request.source, plan, null); } - record LookupQueryPlan( - LookupShardContext shardContext, - LocalCircuitBreaker localBreaker, - DriverContext driverContext, - List operators, - List collectedPages - ) {} - - /** - * Extracts a FieldAttribute from a NamedExpression, throwing an exception if it's not a FieldAttribute. - */ private static FieldAttribute getExtractFieldAttribute(NamedExpression extractField) { if (extractField instanceof FieldAttribute fieldAttribute) { return fieldAttribute; @@ -737,4 +947,12 @@ private static FieldAttribute getExtractFieldAttribute(NamedExpression extractFi extractField.getClass().getSimpleName() ); } + + record LookupQueryPlan( + LookupShardContext shardContext, + LocalCircuitBreaker localBreaker, + DriverContext driverContext, + List operators, + List collectedPages + ) {} } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperator.java index 67775dc846bc6..365e54de651cf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperator.java @@ -33,7 +33,10 @@ import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.session.Configuration; import java.io.IOException; import java.util.HashMap; @@ -72,6 +75,8 @@ public class StreamingLookupFromIndexOperator implements Operator { private final int exchangeBufferSize; private final LookupFromIndexOperator.MatchFieldsMapping matchFieldsMapping; private final boolean profile; + @Nullable + private final Configuration configuration; // State private final AtomicLong batchIdGenerator = new AtomicLong(0); @@ -129,7 +134,8 @@ public StreamingLookupFromIndexOperator( PhysicalPlan rightPreJoinPlan, Expression joinOnConditions, int exchangeBufferSize, - boolean profile + boolean profile, + @Nullable Configuration configuration ) { this.driverContext = driverContext; this.maxOutstandingRequests = maxOutstandingRequests; @@ -146,6 +152,7 @@ public StreamingLookupFromIndexOperator( this.exchangeBufferSize = exchangeBufferSize; this.matchFieldsMapping = LookupFromIndexOperator.buildMatchFieldsMapping(matchFields, joinOnConditions); this.profile = profile; + this.configuration = configuration; // Initialize exchange client in constructor initializeClient(); @@ -162,22 +169,48 @@ private void initializeClient() { serverToClientId, listener) -> { planningStartNanos = System.nanoTime(); - // Create setup request for this server - // clientToServerId is per-server unique, serverToClientId is shared across all servers - LookupFromIndexService.Request setupRequest = new LookupFromIndexService.Request( - sessionId, - lookupIndex, - lookupIndexPattern, - matchFieldsMapping.reindexedMatchFields(), - new Page(0), // Empty page for setup - loadFields, - source, - rightPreJoinPlan, - joinOnConditions, - clientToServerId, - serverToClientId, - profile - ); + LookupFromIndexService.Request setupRequest; + if (configuration != null) { + LogicalPlan logicalPlan = LookupFromIndexService.buildLocalLogicalPlan( + source, + matchFieldsMapping.reindexedMatchFields(), + joinOnConditions, + rightPreJoinPlan, + loadFields + ); + PhysicalPlan preBuiltLookupPlan = new FragmentExec(source, logicalPlan, null, 0); + setupRequest = new LookupFromIndexService.Request( + sessionId, + lookupIndex, + lookupIndexPattern, + matchFieldsMapping.reindexedMatchFields(), + new Page(0), + List.of(), + source, + preBuiltLookupPlan, + null, + clientToServerId, + serverToClientId, + profile, + configuration + ); + } else { + setupRequest = new LookupFromIndexService.Request( + sessionId, + lookupIndex, + lookupIndexPattern, + matchFieldsMapping.reindexedMatchFields(), + new Page(0), + loadFields, + source, + rightPreJoinPlan, + joinOnConditions, + clientToServerId, + serverToClientId, + profile, + null + ); + } lookupService.lookupAsync(setupRequest, serverNode, parentTask, ActionListener.wrap(response -> { planningEndNanos = System.nanoTime(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java index 0fe3be8e1fe77..f656f14896a86 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/FullTextFunction.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; import org.elasticsearch.xpack.esql.plan.logical.UnionAll; import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; import org.elasticsearch.xpack.esql.plan.logical.join.Join; @@ -260,7 +261,10 @@ private static void checkFullTextQueryFunctionForCondition( plan, condition, functionClass, - lp -> (lp instanceof Filter || lp instanceof OrderBy || lp instanceof EsRelation), + lp -> (lp instanceof Filter + || lp instanceof OrderBy + || lp instanceof EsRelation + || lp instanceof ParameterizedQuery), fullTextFunction -> "[" + fullTextFunction.functionName() + "] " + fullTextFunction.functionType(), failures ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LookupLogicalOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LookupLogicalOptimizer.java new file mode 100644 index 0000000000000..f50e9ef38f623 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LookupLogicalOptimizer.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.common.Failures; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneFilters; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStringCasingWithInsensitiveRegexMatch; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.InferIsNotNull; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.LookupPruneFilters; +import org.elasticsearch.xpack.esql.optimizer.rules.logical.local.ReplaceFieldWithConstantOrNull; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor; +import org.elasticsearch.xpack.esql.rule.Rule; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer.operators; + +/** + * Logical plan optimizer for the lookup node. Mirrors {@link LocalLogicalPlanOptimizer} but with a + * reduced rule set appropriate for lookup plans (rooted at + * {@link org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery}, not + * {@link org.elasticsearch.xpack.esql.plan.logical.EsRelation}). + * + *

The lookup logical plan is narrow: {@code Project -> optional Filter -> ParameterizedQuery}. + * This optimizer runs {@link ReplaceFieldWithConstantOrNull} to replace missing/constant fields, + * then the standard operator-optimization rules to fold nulls, simplify booleans, and prune filters.

+ */ +public class LookupLogicalOptimizer extends ParameterizedRuleExecutor { + + private final LogicalVerifier verifier = LogicalVerifier.LOCAL_INSTANCE; + + private static final List> RULES = List.of( + new Batch<>("Lookup local rewrite", Limiter.ONCE, new ReplaceFieldWithConstantOrNull(), new InferIsNotNull()), + lookupOperators() + ); + + public LookupLogicalOptimizer(LocalLogicalOptimizerContext context) { + super(context); + } + + @Override + protected List> batches() { + return RULES; + } + + @SuppressWarnings("unchecked") + private static Batch lookupOperators() { + return localBatch(operators(), new ReplaceStringCasingWithInsensitiveRegexMatch()); + } + + @SuppressWarnings("unchecked") + private static Batch localBatch(Batch batch, Rule... additionalRules) { + Rule[] rules = batch.rules(); + + List> newRules = new ArrayList<>(rules.length + additionalRules.length); + for (Rule r : rules) { + if (r instanceof PruneFilters) { + newRules.add(new LookupPruneFilters()); + } else if (r instanceof OptimizerRules.LocalAware localAware) { + Rule local = localAware.local(); + if (local != null) { + newRules.add(local); + } + } else { + newRules.add(r); + } + } + + newRules.addAll(Arrays.asList(additionalRules)); + + return batch.with(newRules.toArray(Rule[]::new)); + } + + public LogicalPlan localOptimize(LogicalPlan plan) { + LogicalPlan optimized = execute(plan); + Failures failures = verifier.verify(optimized, plan.output()); + if (failures.hasFailures()) { + throw new VerificationException(failures); + } + return optimized; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LookupPhysicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LookupPhysicalPlanOptimizer.java new file mode 100644 index 0000000000000..03e657f9aa29a --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LookupPhysicalPlanOptimizer.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.xpack.esql.VerificationException; +import org.elasticsearch.xpack.esql.common.Failures; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource; +import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor; + +import java.util.List; + +/** + * Physical plan optimizer for the lookup node. Mirrors {@link LocalPhysicalPlanOptimizer} but with a + * reduced rule set appropriate for lookup plans (rooted at ParameterizedQueryExec, not EsSourceExec). + */ +public class LookupPhysicalPlanOptimizer extends ParameterizedRuleExecutor { + + private static final Logger log = LogManager.getLogger(LookupPhysicalPlanOptimizer.class); + + private static final List> RULES = List.of( + new Batch<>("Push to source", new ReplaceSourceAttributes(), new PushFiltersToSource()), + new Batch<>("Field extraction", Limiter.ONCE, new InsertFieldExtraction()) + ); + + private final PhysicalVerifier verifier = PhysicalVerifier.LOCAL_INSTANCE; + + public LookupPhysicalPlanOptimizer(LocalPhysicalOptimizerContext context) { + super(context); + } + + @Override + protected List> batches() { + return RULES; + } + + public PhysicalPlan optimize(PhysicalPlan plan) { + PhysicalPlan optimized = execute(plan); + return verify(optimized, plan.output()); + } + + private PhysicalPlan verify(PhysicalPlan optimizedPlan, List expectedOutputAttributes) { + Failures failures = verifier.verify(optimizedPlan, expectedOutputAttributes); + if (failures.hasFailures()) { + throw new VerificationException(failures); + } + log.debug("Lookup Physical plan:\n{}", optimizedPlan); + return optimizedPlan; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java index 88c3b46549d42..095f4124a3b1b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneFilters.java @@ -20,7 +20,7 @@ import static org.elasticsearch.xpack.esql.core.expression.Literal.FALSE; import static org.elasticsearch.xpack.esql.core.expression.Literal.TRUE; -public final class PruneFilters extends OptimizerRules.OptimizerRule { +public class PruneFilters extends OptimizerRules.OptimizerRule { @Override protected LogicalPlan rule(Filter filter) { Expression condition = filter.condition().transformUp(BinaryLogic.class, PruneFilters::foldBinaryLogic); @@ -30,7 +30,7 @@ protected LogicalPlan rule(Filter filter) { return filter.child(); } if (FALSE.equals(condition) || Expressions.isGuaranteedNull(condition)) { - return PruneEmptyPlans.skipPlan(filter); + return handleAlwaysFalseFilter(filter); } } @@ -40,7 +40,16 @@ protected LogicalPlan rule(Filter filter) { return filter; } - private static Expression foldBinaryLogic(BinaryLogic binaryLogic) { + /** + * Handles a filter whose condition has been folded to {@code false} or {@code null}. + * By default, collapses the plan via {@link PruneEmptyPlans#skipPlan}; subclasses may + * override to preserve plan structure when collapsing is not appropriate (e.g. lookup plans). + */ + protected LogicalPlan handleAlwaysFalseFilter(Filter filter) { + return PruneEmptyPlans.skipPlan(filter); + } + + public static Expression foldBinaryLogic(BinaryLogic binaryLogic) { if (binaryLogic instanceof Or or) { boolean nullLeft = Expressions.isGuaranteedNull(or.left()); boolean nullRight = Expressions.isGuaranteedNull(or.right()); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/LookupPruneFilters.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/LookupPruneFilters.java new file mode 100644 index 0000000000000..d59b05ed4898a --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/LookupPruneFilters.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer.rules.logical.local; + +import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneFilters; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; + +/** + * Lookup-specific variant of {@link PruneFilters}. When a filter condition evaluates to {@code false} + * and the filter's subtree contains a {@link ParameterizedQuery}, marks it as {@code emptyResult=true} + * instead of collapsing the entire plan to a {@code LocalRelation}. This preserves the plan structure + * so the {@code LookupExecutionPlanner} can still build the operator chain. + */ +public class LookupPruneFilters extends PruneFilters { + + @Override + protected LogicalPlan handleAlwaysFalseFilter(Filter filter) { + if (filter.anyMatch(n -> n instanceof ParameterizedQuery)) { + return filter.child() + .transformUp( + ParameterizedQuery.class, + pq -> new ParameterizedQuery(pq.source(), pq.output(), pq.matchFields(), pq.joinOnConditions(), true) + ); + } + return super.handleAlwaysFalseFilter(filter); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java index cf3074339d405..1ac99c2e61275 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/local/ReplaceFieldWithConstantOrNull.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Filter; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.RegexExtract; import org.elasticsearch.xpack.esql.plan.logical.TopN; @@ -63,22 +64,15 @@ public LogicalPlan apply(LogicalPlan plan, LocalLogicalOptimizerContext localLog } // find constant values only in the main indices else if (esRelation.indexMode() == IndexMode.STANDARD) { - for (Attribute attribute : esRelation.output()) { - if (attribute instanceof FieldAttribute fa) { - // Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead. - String val = localLogicalOptimizerContext.searchStats().constantValue(fa.fieldName()); - if (val != null) { - attrToConstant.put(attribute, Literal.of(attribute, BytesRefs.toBytesRef(val))); - } - } else if (attribute instanceof MetadataAttribute ma && ma.name().startsWith(PROJECT_METADATA_PREFIX)) { - String val = localLogicalOptimizerContext.searchStats().constantValue(new FieldAttribute.FieldName(ma.name())); - if (val != null) { - attrToConstant.put(attribute, Literal.of(attribute, BytesRefs.toBytesRef(val))); - } - } - } + collectConstants(esRelation.output(), localLogicalOptimizerContext, attrToConstant); } }); + // For local logical planning on the lookup node + // also collect the constants from the ParameterizedQuery class to prune based on the lookup index stats + plan.forEachUp( + ParameterizedQuery.class, + paramQuery -> collectConstants(paramQuery.output(), localLogicalOptimizerContext, attrToConstant) + ); AttributeSet lookupFields = lookupFieldsBuilder.build(); AttributeSet externalFields = externalFieldsBuilder.build(); @@ -99,14 +93,15 @@ private LogicalPlan replaceWithNullOrConstant( Predicate shouldBeRetained, Map attrToConstant ) { - if (plan instanceof EsRelation relation) { - // For any missing field, place an Eval right after the EsRelation to assign null values to that attribute (using the same name - // id!), thus avoiding that InsertFieldExtrations inserts a field extraction later. + if (plan instanceof EsRelation || plan instanceof ParameterizedQuery) { + // For any missing field, place an Eval right after the EsRelation/ParameterizedQuery + // to assign null values to that attribute (using the same name id!), + // thus avoiding that InsertFieldExtraction inserts a field extraction later. // This means that an EsRelation[field1, field2, field3] where field1 and field 3 are missing will be replaced by // Project[field1, field2, field3] <- keeps the ordering intact // \_Eval[field1 = null, field3 = null] // \_EsRelation[field1, field2, field3] - List relationOutput = relation.output(); + List relationOutput = plan.output(); var aliasedNulls = RuleUtils.aliasedNulls( relationOutput, attr -> attr instanceof FieldAttribute f && shouldBeRetained.test(f) == false @@ -118,7 +113,7 @@ private LogicalPlan replaceWithNullOrConstant( return plan; } - Eval eval = new Eval(plan.source(), relation, nullLiterals); + Eval eval = new Eval(plan.source(), plan, nullLiterals); // This projection is redundant if there's another projection downstream (and no commands depend on the order until we hit it). return new Project(plan.source(), eval, newProjections); } @@ -152,4 +147,25 @@ private LogicalPlan replaceWithNullOrConstant( return plan; } + + private static void collectConstants( + List output, + LocalLogicalOptimizerContext context, + Map attrToConstant + ) { + for (Attribute attribute : output) { + if (attribute instanceof FieldAttribute fa) { + // Do not use the attribute name, this can deviate from the field name for union types; use fieldName() instead. + String val = context.searchStats().constantValue(fa.fieldName()); + if (val != null) { + attrToConstant.put(attribute, Literal.of(attribute, BytesRefs.toBytesRef(val))); + } + } else if (attribute instanceof MetadataAttribute ma && ma.name().startsWith(PROJECT_METADATA_PREFIX)) { + String val = context.searchStats().constantValue(new FieldAttribute.FieldName(ma.name())); + if (val != null) { + attrToConstant.put(attribute, Literal.of(attribute, BytesRefs.toBytesRef(val))); + } + } + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java index 8c7dc61d7c6ab..5ce7a3969bf26 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java @@ -37,7 +37,10 @@ public class InsertFieldExtraction extends PhysicalOptimizerRules.ParameterizedO @Override public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext context) { - return InsertFieldExtraction.rule(plan, context.configuration().pragmas().fieldExtractPreference()); + var preference = context.configuration() != null + ? context.configuration().pragmas().fieldExtractPreference() + : MappedFieldType.FieldExtractPreference.NONE; + return InsertFieldExtraction.rule(plan, preference); } static PhysicalPlan rule(PhysicalPlan plan, MappedFieldType.FieldExtractPreference fieldExtractPreference) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java index 28cc49650ebc0..3289928dc6bf5 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushFiltersToSource.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.ExternalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.ParameterizedQueryExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import java.util.ArrayList; @@ -53,25 +54,18 @@ protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext plan = planFilterExec(filterExec, evalExec, queryExec, ctx); } else if (filterExec.child() instanceof ExternalSourceExec externalExec) { plan = planFilterExecForExternalSource(filterExec, externalExec, ctx.filterPushdownRegistry()); + } else if (filterExec.child() instanceof EvalExec evalExec && evalExec.child() instanceof ParameterizedQueryExec pqExec) { + plan = planFilterExec(filterExec, evalExec, pqExec, ctx); + } else if (filterExec.child() instanceof ParameterizedQueryExec pqExec) { + plan = planFilterExec(filterExec, pqExec, ctx); } return plan; } private static PhysicalPlan planFilterExec(FilterExec filterExec, EsQueryExec queryExec, LocalPhysicalOptimizerContext ctx) { LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); - List pushable = new ArrayList<>(); - List nonPushable = new ArrayList<>(); - for (Expression exp : splitAnd(filterExec.condition())) { - switch (translatable(exp, pushdownPredicates).finish()) { - case NO -> nonPushable.add(exp); - case YES -> pushable.add(exp); - case RECHECK -> { - pushable.add(exp); - nonPushable.add(exp); - } - } - } - return rewrite(pushdownPredicates, filterExec, queryExec, pushable, nonPushable, List.of()); + PushdownClassification classified = classifyFilters(filterExec.condition(), pushdownPredicates); + return rewrite(pushdownPredicates, filterExec, queryExec, classified.pushable, classified.nonPushable, List.of()); } private static PhysicalPlan planFilterExec( @@ -82,22 +76,9 @@ private static PhysicalPlan planFilterExec( ) { LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); AttributeMap aliasReplacedBy = getAliasReplacedBy(evalExec); - List pushable = new ArrayList<>(); - List nonPushable = new ArrayList<>(); - for (Expression exp : splitAnd(filterExec.condition())) { - Expression resExp = exp.transformUp(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r)); - switch (translatable(resExp, pushdownPredicates).finish()) { - case NO -> nonPushable.add(exp); - case YES -> pushable.add(exp); - case RECHECK -> { - nonPushable.add(exp); - nonPushable.add(exp); - } - } - } - // Replace field references with their actual field attributes - pushable.replaceAll(e -> e.transformDown(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r))); - return rewrite(pushdownPredicates, filterExec, queryExec, pushable, nonPushable, evalExec.fields()); + PushdownClassification classified = classifyFilters(filterExec.condition(), pushdownPredicates, aliasReplacedBy); + classified.pushable.replaceAll(e -> e.transformDown(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r))); + return rewrite(pushdownPredicates, filterExec, queryExec, classified.pushable, classified.nonPushable, evalExec.fields()); } static AttributeMap getAliasReplacedBy(EvalExec evalExec) { @@ -284,4 +265,85 @@ private static PhysicalPlan planFilterExecForExternalSource( // No pushable filters - return original plan return filterExec; } + + private static PhysicalPlan planFilterExec(FilterExec filterExec, ParameterizedQueryExec pqExec, LocalPhysicalOptimizerContext ctx) { + LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); + PushdownClassification classified = classifyFilters(filterExec.condition(), pushdownPredicates); + return rewrite(pushdownPredicates, filterExec, pqExec, classified.pushable, classified.nonPushable, List.of()); + } + + private static PhysicalPlan planFilterExec( + FilterExec filterExec, + EvalExec evalExec, + ParameterizedQueryExec pqExec, + LocalPhysicalOptimizerContext ctx + ) { + LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()); + AttributeMap aliasReplacedBy = getAliasReplacedBy(evalExec); + PushdownClassification classified = classifyFilters(filterExec.condition(), pushdownPredicates, aliasReplacedBy); + classified.pushable.replaceAll(e -> e.transformDown(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r))); + return rewrite(pushdownPredicates, filterExec, pqExec, classified.pushable, classified.nonPushable, evalExec.fields()); + } + + private static PhysicalPlan rewrite( + LucenePushdownPredicates pushdownPredicates, + FilterExec filterExec, + ParameterizedQueryExec pqExec, + List pushable, + List nonPushable, + List evalFields + ) { + // Combine GT, GTE, LT and LTE in pushable to Range if possible + List newPushable = combineEligiblePushableToRange(pushable); + if (newPushable.size() > 0) { // update the executable with pushable conditions + Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, Predicates.combineAnd(newPushable)); + QueryBuilder planQuery = queryDSL.toQueryBuilder(); + QueryBuilder query = Queries.combine(Queries.Clause.FILTER, asList(pqExec.query(), planQuery)); + PhysicalPlan plan = pqExec.withQuery(query); + plan = evalFields.isEmpty() ? plan : new EvalExec(filterExec.source(), plan, evalFields); + if (nonPushable.size() > 0) { + // update filter with remaining non-pushable conditions + return new FilterExec(filterExec.source(), plan, Predicates.combineAnd(nonPushable)); + } else { + // prune Filter entirely + return plan; + } + } // else: nothing changes + return filterExec; + } + + private record PushdownClassification(List pushable, List nonPushable) {} + + private static PushdownClassification classifyFilters(Expression condition, LucenePushdownPredicates pushdownPredicates) { + return classifyFilters(condition, pushdownPredicates, AttributeMap.emptyAttributeMap()); + } + + /** + * Classifies filter expressions into pushable and non-pushable lists. + * When {@code aliasReplacedBy} is non-empty, alias resolution is applied before + * the translatability check, but the original (unresolved) expressions are stored + * so that non-pushable filters still reference the EvalExec output attributes. + */ + private static PushdownClassification classifyFilters( + Expression condition, + LucenePushdownPredicates pushdownPredicates, + AttributeMap aliasReplacedBy + ) { + List pushable = new ArrayList<>(); + List nonPushable = new ArrayList<>(); + for (Expression exp : splitAnd(condition)) { + Expression resExp = aliasReplacedBy.isEmpty() + ? exp + : exp.transformUp(ReferenceAttribute.class, r -> aliasReplacedBy.resolve(r, r)); + switch (translatable(resExp, pushdownPredicates).finish()) { + case NO -> nonPushable.add(exp); + case YES -> pushable.add(exp); + case RECHECK -> { + pushable.add(exp); + nonPushable.add(exp); + } + } + } + return new PushdownClassification(pushable, nonPushable); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java index dc5b01fa1c6db..27aaaa7b72898 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceSourceAttributes.java @@ -12,9 +12,12 @@ import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.enrich.AbstractLookupService; import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec; +import org.elasticsearch.xpack.esql.plan.physical.LeafExec; +import org.elasticsearch.xpack.esql.plan.physical.ParameterizedQueryExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import java.util.ArrayList; @@ -22,14 +25,31 @@ import static org.elasticsearch.xpack.esql.optimizer.rules.logical.OptimizerRules.TransformDirection.UP; -public class ReplaceSourceAttributes extends PhysicalOptimizerRules.OptimizerRule { +/** + * Strips source leaf nodes down to their minimal output attributes so that {@link InsertFieldExtraction} + * can later add only the fields that the rest of the plan actually needs. + *

+ * Handles both {@link EsSourceExec} (data node path, replaced with {@link EsQueryExec}) and + * {@link ParameterizedQueryExec} (lookup node path, output stripped to {@code [_doc, _positions]}). + */ +public class ReplaceSourceAttributes extends PhysicalOptimizerRules.OptimizerRule { public ReplaceSourceAttributes() { super(UP); } @Override - protected PhysicalPlan rule(EsSourceExec plan) { + protected PhysicalPlan rule(LeafExec plan) { + if (plan instanceof EsSourceExec esSource) { + return replaceEsSource(esSource); + } + if (plan instanceof ParameterizedQueryExec pqExec) { + return replaceParameterizedQuery(pqExec); + } + return plan; + } + + private static PhysicalPlan replaceEsSource(EsSourceExec plan) { final List attributes = new ArrayList<>(); attributes.add(getDocAttribute(plan)); @@ -58,9 +78,26 @@ protected PhysicalPlan rule(EsSourceExec plan) { ); } + private static PhysicalPlan replaceParameterizedQuery(ParameterizedQueryExec plan) { + List strippedOutput = new ArrayList<>(2); + for (Attribute attr : plan.output()) { + if (EsQueryExec.isDocAttribute(attr) || AbstractLookupService.LOOKUP_POSITIONS_FIELD.name().equals(attr.name())) { + strippedOutput.add(attr); + } + } + return new ParameterizedQueryExec( + plan.source(), + strippedOutput, + plan.matchFields(), + plan.joinOnConditions(), + plan.query(), + plan.emptyResult() + ); + } + private static Attribute getDocAttribute(EsSourceExec plan) { - // The source (or doc) field is sometimes added to the relation output as a hack to enable late materialization in the reduce - // driver. In that case, we should take it instead of replacing it with a new one to ensure the same attribute is used throughout. + // Reuse the existing doc attribute from the relation output when present, rather than creating a new one, + // to ensure the same attribute instance is used throughout the plan (needed for late materialization in the reduce driver). var sourceAttributes = plan.output().stream().filter(EsQueryExec::isDocAttribute).toList(); if (sourceAttributes.size() > 1) { throw new IllegalStateException("Expected at most one source attribute, found: " + sourceAttributes); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java index a93b642406fc1..eb9880f1c8bed 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/PlanWritables.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.plan.logical.MetricsInfo; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.OrderBy; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.RegisteredDomain; import org.elasticsearch.xpack.esql.plan.logical.Sample; @@ -100,6 +101,7 @@ public static List logical() { Lookup.ENTRY, MvExpand.ENTRY, OrderBy.ENTRY, + ParameterizedQuery.ENTRY, Project.ENTRY, Project.V9_ENTRY, // Backward compatibility for reading old "EsqlProject" type Rerank.ENTRY, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ParameterizedQuery.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ParameterizedQuery.java new file mode 100644 index 0000000000000..75e4e52326d4d --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/ParameterizedQuery.java @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +/** + * Logical plan node representing a parameterized lookup query source. + * This replaces {@link EsRelation} in the lookup node's logical plan, + * representing a source that receives input pages (match keys) from the + * exchange and generates parameterized queries against the lookup index. + */ +public class ParameterizedQuery extends LeafPlan { + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + LogicalPlan.class, + "ParameterizedQuery", + ParameterizedQuery::readFrom + ); + + private final List output; + private final List matchFields; + @Nullable + private final Expression joinOnConditions; + /** + * Runtime-only flag set by the {@link org.elasticsearch.xpack.esql.optimizer.LookupLogicalOptimizer} + * when a filter folds to {@code false}/{@code null}. Not serialized — it is computed locally on the + * lookup node after deserialization. + */ + private final boolean emptyResult; + + public ParameterizedQuery(Source source, List output, List matchFields, @Nullable Expression joinOnConditions) { + this(source, output, matchFields, joinOnConditions, false); + } + + public ParameterizedQuery( + Source source, + List output, + List matchFields, + @Nullable Expression joinOnConditions, + boolean emptyResult + ) { + super(source); + this.output = output; + this.matchFields = matchFields; + this.joinOnConditions = joinOnConditions; + this.emptyResult = emptyResult; + } + + private static ParameterizedQuery readFrom(StreamInput in) throws IOException { + Source source = Source.readFrom((PlanStreamInput) in); + List output = in.readNamedWriteableCollectionAsList(Attribute.class); + List matchFields = in.readCollectionAsList(MatchConfig::new); + Expression joinOnConditions = in.readOptionalNamedWriteable(Expression.class); + return new ParameterizedQuery(source, output, matchFields, joinOnConditions); + } + + @Override + public List output() { + return output; + } + + public List matchFields() { + return matchFields; + } + + @Nullable + public Expression joinOnConditions() { + return joinOnConditions; + } + + public boolean emptyResult() { + return emptyResult; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + Source.EMPTY.writeTo(out); + out.writeNamedWriteableCollection(output); + out.writeCollection(matchFields); + out.writeOptionalNamedWriteable(joinOnConditions); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public boolean expressionsResolved() { + return true; + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, ParameterizedQuery::new, output, matchFields, joinOnConditions, emptyResult); + } + + @Override + public int hashCode() { + return Objects.hash(output, matchFields, joinOnConditions, emptyResult); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ParameterizedQuery other = (ParameterizedQuery) obj; + return Objects.equals(output, other.output) + && Objects.equals(matchFields, other.matchFields) + && Objects.equals(joinOnConditions, other.joinOnConditions) + && emptyResult == other.emptyResult; + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParameterizedQueryExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParameterizedQueryExec.java index b98f11eb44665..5d8afb1756491 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParameterizedQueryExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParameterizedQueryExec.java @@ -8,9 +8,13 @@ package org.elasticsearch.xpack.esql.plan.physical; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; import java.io.IOException; import java.util.List; @@ -25,10 +29,37 @@ */ public class ParameterizedQueryExec extends LeafExec { private final List output; + private final List matchFields; + @Nullable + private final Expression joinOnConditions; + @Nullable + private final QueryBuilder query; + private final boolean emptyResult; - public ParameterizedQueryExec(Source source, List output) { + public ParameterizedQueryExec( + Source source, + List output, + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable QueryBuilder query + ) { + this(source, output, matchFields, joinOnConditions, query, false); + } + + public ParameterizedQueryExec( + Source source, + List output, + List matchFields, + @Nullable Expression joinOnConditions, + @Nullable QueryBuilder query, + boolean emptyResult + ) { super(source); this.output = output; + this.matchFields = matchFields; + this.joinOnConditions = joinOnConditions; + this.query = query; + this.emptyResult = emptyResult; } @Override @@ -36,6 +67,30 @@ public List output() { return output; } + public List matchFields() { + return matchFields; + } + + @Nullable + public Expression joinOnConditions() { + return joinOnConditions; + } + + @Nullable + public QueryBuilder query() { + return query; + } + + public boolean emptyResult() { + return emptyResult; + } + + public ParameterizedQueryExec withQuery(QueryBuilder query) { + return Objects.equals(this.query, query) + ? this + : new ParameterizedQueryExec(source(), output, matchFields, joinOnConditions, query, emptyResult); + } + @Override public void writeTo(StreamOutput out) throws IOException { throw new UnsupportedOperationException("not serialized"); @@ -48,7 +103,7 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, ParameterizedQueryExec::new, output); + return NodeInfo.create(this, ParameterizedQueryExec::new, output, matchFields, joinOnConditions, query, emptyResult); } @Override @@ -56,11 +111,15 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ParameterizedQueryExec that = (ParameterizedQueryExec) o; - return Objects.equals(output, that.output); + return Objects.equals(output, that.output) + && Objects.equals(matchFields, that.matchFields) + && Objects.equals(joinOnConditions, that.joinOnConditions) + && Objects.equals(query, that.query) + && emptyResult == that.emptyResult; } @Override public int hashCode() { - return Objects.hash(output); + return Objects.hash(output, matchFields, joinOnConditions, query, emptyResult); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index df9b5fe580c2b..8e2c1ed9e4aa6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -903,16 +903,14 @@ private PhysicalOperation planLookupJoin(LookupJoinExec join, LocalExecutionPlan join.joinOnConditions(), useStreamingOperator, context.queryPragmas().exchangeBufferSize(), - configuration.profile() + configuration.profile(), + configuration ), layout ); } - /** - * The transport version that introduced streaming lookup support. - */ - private static final TransportVersion ESQL_STREAMING_LOOKUP_JOIN = TransportVersion.fromName("esql_streaming_lookup_join"); + private static final TransportVersion ESQL_LOOKUP_PLANNING = TransportVersion.fromName("esql_lookup_planning"); /** * Determines whether streaming lookup should be used based on the target node's transport version. @@ -941,12 +939,12 @@ private boolean shouldUseStreamingOperator(LookupFromIndexService service, Strin DiscoveryNode node = clusterState.nodes().get(shardRouting.currentNodeId()); Transport.Connection connection = service.getTransportService().getConnection(node); TransportVersion nodeVersion = connection.getTransportVersion(); - if (nodeVersion.supports(ESQL_STREAMING_LOOKUP_JOIN) == false) { + if (nodeVersion.supports(ESQL_LOOKUP_PLANNING) == false) { logger.debug( "Using non-streaming lookup operator: node [{}] has transport version [{}] which does not support [{}]", node.getId(), nodeVersion, - ESQL_STREAMING_LOOKUP_JOIN + ESQL_LOOKUP_PLANNING ); return false; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java index f0fbf6fbf9730..d045d636ebacd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Limit; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MetricsInfo; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; import org.elasticsearch.xpack.esql.plan.logical.TopN; import org.elasticsearch.xpack.esql.plan.logical.TsInfo; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -34,6 +35,7 @@ import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.MetricsInfoExec; +import org.elasticsearch.xpack.esql.plan.physical.ParameterizedQueryExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; import org.elasticsearch.xpack.esql.plan.physical.TsInfoExec; @@ -72,6 +74,10 @@ private PhysicalPlan mapLeaf(LeafPlan leaf) { return new EsSourceExec(esRelation); } + if (leaf instanceof ParameterizedQuery pq) { + return new ParameterizedQueryExec(pq.source(), pq.output(), pq.matchFields(), pq.joinOnConditions(), null, pq.emptyResult()); + } + if (leaf instanceof ExternalRelation external) { return external.toPhysicalExec(); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java index 25bcdb759d6dc..922e7a86c4033 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximation/ApproximationSupportTests.java @@ -65,6 +65,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Lookup; import org.elasticsearch.xpack.esql.plan.logical.MMR; import org.elasticsearch.xpack.esql.plan.logical.MetricsInfo; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; import org.elasticsearch.xpack.esql.plan.logical.Rename; import org.elasticsearch.xpack.esql.plan.logical.Subquery; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; @@ -139,6 +140,7 @@ public class ApproximationSupportTests extends ESTestCase { Join.class, InlineJoin.class, LookupJoin.class, + ParameterizedQuery.class, // InlineStats is not supported yet. // Only a single Stats command is supported. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlannerTests.java index 8b30090d36cd9..8ab5dcbe6e4f8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupExecutionPlannerTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -56,6 +57,7 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.Source; @@ -63,6 +65,7 @@ import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders; import org.elasticsearch.xpack.esql.planner.PlannerSettings; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.hamcrest.MatcherAssert; import org.junit.After; @@ -135,6 +138,18 @@ protected LookupEnrichQueryGenerator queryList( return mock(LookupEnrichQueryGenerator.class); } + @Override + protected LookupEnrichQueryGenerator queryListFromPlan( + List matchFields, + Expression joinOnConditions, + QueryBuilder pushedQuery, + SearchExecutionContext context, + AliasFilter aliasFilter, + Warnings warnings + ) { + return mock(LookupEnrichQueryGenerator.class); + } + @Override protected void startServerWithOperators( BidirectionalBatchExchangeServer server, @@ -179,6 +194,7 @@ public void setup() { Set> clusterSettings = new HashSet<>(); clusterSettings.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); clusterSettings.addAll(PlannerSettings.settings()); + clusterSettings.addAll(EsqlFlags.ALL_ESQL_FLAGS_SETTINGS); when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(Settings.EMPTY, clusterSettings)); indicesService = mock(IndicesService.class); @@ -383,9 +399,10 @@ private LookupFromIndexService.LookupQueryPlan generateQueryPlan(List conditions = new ArrayList<>(); for (int i = 0; i < numberOfJoinColumns; i++) { @@ -324,18 +324,20 @@ protected Operator.OperatorFactory simple(SimpleOptions options) { joinOnExpression, false, // useStreamingOperator - testing non-streaming operator QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY), - false // profile + false, // profile + null // configuration - not needed for non-streaming tests ); } - static FragmentExec buildLessThanFilter(int value) { - FieldAttribute filterAttribute = new FieldAttribute( - Source.EMPTY, - "lint", - new EsField("lint", DataType.INTEGER, Collections.emptyMap(), true, EsField.TimeSeriesFieldType.NONE) - ); + static FragmentExec buildLessThanFilter(int value, List loadFields) { + FieldAttribute filterAttribute = loadFields.stream() + .filter(f -> f.name().equals("lint")) + .map(FieldAttribute.class::cast) + .findFirst() + .orElseThrow(); Expression lessThan = new LessThan(Source.EMPTY, filterAttribute, new Literal(Source.EMPTY, value, DataType.INTEGER)); - EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), List.of()); + List attrs = loadFields.stream().map(f -> (Attribute) f).toList(); + EsRelation esRelation = new EsRelation(Source.EMPTY, "test", IndexMode.LOOKUP, Map.of(), Map.of(), Map.of(), attrs); Filter filter = new Filter(Source.EMPTY, esRelation, lessThan); return new FragmentExec(filter); } @@ -386,7 +388,7 @@ protected Matcher expectedToStringOfSimple() { .append("Filter\\[lint\\{f}#\\d+ < ") .append(LESS_THAN_VALUE) .append("\\[INTEGER]]\\n") - .append("\\\\_EsRelation\\[test]\\[LOOKUP]\\[\\]<>\\]\\]"); + .append("\\\\_EsRelation\\[test]\\[LOOKUP]\\[lkwd\\{f}#\\d+, lint\\{f}#\\d+]<>\\]\\]"); sb.append(")"); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperatorTests.java index c73fb8b4c0ceb..374c2af4b5748 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/StreamingLookupFromIndexOperatorTests.java @@ -67,6 +67,7 @@ import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; +import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; @@ -83,6 +84,7 @@ import org.elasticsearch.xpack.esql.planner.PlannerSettings; import org.elasticsearch.xpack.esql.plugin.EsqlFlags; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.elasticsearch.xpack.esql.session.Configuration; import org.hamcrest.Matcher; import org.junit.After; import org.junit.Before; @@ -197,7 +199,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options, int maxOutstand matchFields.add(new MatchConfig(matchField, i, inputDataType)); } Expression joinOnExpression = null; - FragmentExec rightPlanWithOptionalPreJoinFilter = LookupFromIndexOperatorTests.buildLessThanFilter(LESS_THAN_VALUE); + FragmentExec rightPlanWithOptionalPreJoinFilter = LookupFromIndexOperatorTests.buildLessThanFilter(LESS_THAN_VALUE, loadFields); if (operation != null) { List conditions = new ArrayList<>(); for (int i = 0; i < numberOfJoinColumns; i++) { @@ -230,6 +232,7 @@ protected Operator.OperatorFactory simple(SimpleOptions options, int maxOutstand final FragmentExec finalRightPlan = rightPlanWithOptionalPreJoinFilter; final Expression finalJoinOnExpression = joinOnExpression; final int exchangeBufferSize = QueryPragmas.EXCHANGE_BUFFER_SIZE.getDefault(Settings.EMPTY); + final Configuration finalConfiguration = randomBoolean() ? EsqlTestUtils.TEST_CFG : null; // Create a factory that produces StreamingLookupFromIndexOperator return new Operator.OperatorFactory() { @@ -250,7 +253,8 @@ public Operator get(DriverContext driverContext) { finalRightPlan, finalJoinOnExpression, exchangeBufferSize, - true // profile - enables plan tracking for mode verification + true, // profile - enables plan tracking for mode verification + finalConfiguration ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LookupLogicalOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LookupLogicalOptimizerTests.java new file mode 100644 index 0000000000000..861fc61d71850 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LookupLogicalOptimizerTests.java @@ -0,0 +1,273 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.analysis.Verifier; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.EsIndexGenerator; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.Filter; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.stats.SearchStats; +import org.elasticsearch.xpack.esql.telemetry.Metrics; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.testAnalyzerContext; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.indexResolutions; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +/** + * Tests for {@link LookupLogicalOptimizer}, verifying that logical optimization rules are applied + * to the lookup node's logical plan before physical planning. + */ +public class LookupLogicalOptimizerTests extends MapperServiceTestCase { + + private Analyzer analyzer; + private TestPlannerOptimizer plannerOptimizer; + + @Before + public void init() { + Map mapping = loadMapping("mapping-basic.json"); + EsIndex test = EsIndexGenerator.esIndex("test", mapping, Map.of("test", IndexMode.STANDARD)); + + analyzer = new Analyzer( + testAnalyzerContext( + TEST_CFG, + new EsqlFunctionRegistry(), + indexResolutions(test), + defaultLookupResolution(), + new EnrichResolution(), + emptyInferenceResolution() + ), + new Verifier(new Metrics(new EsqlFunctionRegistry(), true, true), new XPackLicenseState(() -> 0L)) + ); + plannerOptimizer = new TestPlannerOptimizer(TEST_CFG, analyzer); + } + + @Override + protected List filteredWarnings() { + return withDefaultLimitWarning(super.filteredWarnings()); + } + + /** + * Simple lookup with no filters. + * Expects: Project -> ParameterizedQuery + */ + public void testSimpleLookup() { + LogicalPlan plan = optimizeLookupLogicalPlan("FROM test | LOOKUP JOIN test_lookup ON emp_no", TEST_SEARCH_STATS); + + Project project = as(plan, Project.class); + assertThat(project.child(), instanceOf(ParameterizedQuery.class)); + } + + /** + * Filter referencing an existing field should be preserved. + * Expects: Project -> Filter -> ParameterizedQuery + */ + public void testFilterOnExistingField() { + LogicalPlan plan = optimizeLookupLogicalPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" + """, TEST_SEARCH_STATS); + + Project project = as(plan, Project.class); + Filter filter = as(project.child(), Filter.class); + assertThat(filter.child(), instanceOf(ParameterizedQuery.class)); + } + + /** + * Filter referencing a missing field should be folded away (the condition becomes null/false). + * ReplaceFieldWithConstantOrNull replaces the missing field with null, then LookupPruneFilters + * marks the ParameterizedQuery as emptyResult instead of collapsing the plan to LocalRelation, + * preserving the plan structure for the LookupExecutionPlanner. + * Expects: Project -> Eval -> ParameterizedQuery(emptyResult=true) + */ + public void testFilterOnMissingFieldFolded() { + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().exclude( + EsqlTestUtils.TestConfigurableSearchStats.Config.EXISTS, + "language_name" + ); + + LogicalPlan plan = optimizeLookupLogicalPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" + """, stats); + + Project project = as(plan, Project.class); + Eval eval = as(project.child(), Eval.class); + ParameterizedQuery pq = as(eval.child(), ParameterizedQuery.class); + assertTrue("Expected emptyResult=true on ParameterizedQuery", pq.emptyResult()); + } + + /** + * Filter that becomes always-true due to missing field stats should be pruned. + * "language_name IS NULL" with language_name missing → "null IS NULL" → true → filter removed. + * Expects: Project -> Eval -> ParameterizedQuery + * See {@link LookupPhysicalPlanOptimizerTests#testDropMissingFieldPrunesEval} for verification that the Eval is removed during + * physical optimization. + */ + public void testFilterOnMissingFieldFoldedToTrue() { + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().exclude( + EsqlTestUtils.TestConfigurableSearchStats.Config.EXISTS, + "language_name" + ); + + LogicalPlan plan = optimizeLookupLogicalPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NULL + """, stats); + + Project project = as(plan, Project.class); + Eval eval = as(project.child(), Eval.class); + as(eval.child(), ParameterizedQuery.class); + } + + /** + * Constant field matching the filter value: {@code language_name} is a constant {@code "English"}, + * and the filter is {@code WHERE language_name == "English"}. The constant replaces the field reference, + * the filter folds to {@code true} and is pruned. + * Expects: Project -> ParameterizedQuery (no Filter, no Eval since the field exists and is constant). + */ + public void testConstantFieldMatchingFilter() { + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().withConstantValue( + "language_name", + "English" + ); + + LogicalPlan plan = optimizeLookupLogicalPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" + """, stats); + + Project project = as(plan, Project.class); + assertThat(project.child(), instanceOf(ParameterizedQuery.class)); + } + + /** + * Constant field NOT matching the filter value: {@code language_name} is a constant {@code "Spanish"}, + * but the filter is {@code WHERE language_name == "English"}. The constant replaces the field reference, + * the filter folds to {@code false}, and LookupPruneFilters marks the ParameterizedQuery as emptyResult. + * Expects: Project -> ParameterizedQuery(emptyResult=true) + */ + public void testConstantFieldMismatchFoldsToEmpty() { + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().withConstantValue( + "language_name", + "Spanish" + ); + + LogicalPlan plan = optimizeLookupLogicalPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" + """, stats); + + Project project = as(plan, Project.class); + ParameterizedQuery pq = as(project.child(), ParameterizedQuery.class); + assertTrue("Expected emptyResult=true on ParameterizedQuery", pq.emptyResult()); + } + + private LogicalPlan optimizeLookupLogicalPlan(String esql, SearchStats searchStats) { + List plans = optimizeAllLookupLogicalPlans(esql, searchStats); + assertThat("Expected exactly one LOOKUP JOIN", plans, hasSize(1)); + return plans.getFirst(); + } + + /** + * Runs the full planning pipeline, finds LookupJoinExec nodes, then builds and logically optimizes + * each lookup plan. Returns the optimized logical plans in tree traversal order. + */ + private List optimizeAllLookupLogicalPlans(String esql, SearchStats searchStats) { + PhysicalPlan dataNodePlan = plannerOptimizer.plan(esql); + + List joins = findAllLookupJoins(dataNodePlan); + assertThat("Expected at least one LookupJoinExec in the plan", joins.isEmpty(), is(false)); + + List lookupPlans = new ArrayList<>(joins.size()); + for (LookupJoinExec join : joins) { + lookupPlans.add(buildAndOptimizeLookupLogicalPlan(join, searchStats)); + } + return lookupPlans; + } + + private static LogicalPlan buildAndOptimizeLookupLogicalPlan(LookupJoinExec join, SearchStats searchStats) { + List matchFields = new ArrayList<>(join.leftFields().size()); + for (int i = 0; i < join.leftFields().size(); i++) { + FieldAttribute right = (FieldAttribute) join.rightFields().get(i); + String fieldName = right.exactAttribute().fieldName().string(); + if (join.isOnJoinExpression()) { + fieldName = join.leftFields().get(i).name(); + } + matchFields.add(new MatchConfig(fieldName, i, join.leftFields().get(i).dataType())); + } + + LogicalPlan logicalPlan = LookupFromIndexService.buildLocalLogicalPlan( + join.source(), + matchFields, + join.joinOnConditions(), + join.right(), + join.addedFields().stream().map(f -> (NamedExpression) f).toList() + ); + + var context = new LocalLogicalOptimizerContext(TEST_CFG, FoldContext.small(), searchStats); + return new LookupLogicalOptimizer(context).localOptimize(logicalPlan); + } + + private static List findAllLookupJoins(PhysicalPlan plan) { + List joins = new ArrayList<>(); + collectLookupJoins(plan, joins); + return joins; + } + + private static void collectLookupJoins(PhysicalPlan plan, List joins) { + if (plan instanceof LookupJoinExec join) { + joins.add(join); + } + for (PhysicalPlan child : plan.children()) { + collectLookupJoins(child, joins); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LookupPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LookupPhysicalPlanOptimizerTests.java new file mode 100644 index 0000000000000..4817ca7c981a1 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LookupPhysicalPlanOptimizerTests.java @@ -0,0 +1,461 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.optimizer; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.mapper.MapperServiceTestCase; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.xpack.esql.EsqlTestUtils; +import org.elasticsearch.xpack.esql.action.EsqlCapabilities; +import org.elasticsearch.xpack.esql.analysis.Analyzer; +import org.elasticsearch.xpack.esql.analysis.EnrichResolution; +import org.elasticsearch.xpack.esql.analysis.MutableAnalyzerContext; +import org.elasticsearch.xpack.esql.analysis.Verifier; +import org.elasticsearch.xpack.esql.core.expression.Alias; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; +import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.Literal; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.type.EsField; +import org.elasticsearch.xpack.esql.enrich.AbstractLookupService; +import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; +import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry; +import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals; +import org.elasticsearch.xpack.esql.index.EsIndex; +import org.elasticsearch.xpack.esql.index.EsIndexGenerator; +import org.elasticsearch.xpack.esql.plan.logical.Eval; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.ParameterizedQuery; +import org.elasticsearch.xpack.esql.plan.logical.Project; +import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.EvalExec; +import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; +import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; +import org.elasticsearch.xpack.esql.plan.physical.ParameterizedQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; +import org.elasticsearch.xpack.esql.planner.PlannerSettings; +import org.elasticsearch.xpack.esql.plugin.EsqlFlags; +import org.elasticsearch.xpack.esql.session.Configuration; +import org.elasticsearch.xpack.esql.stats.SearchStats; +import org.elasticsearch.xpack.esql.telemetry.Metrics; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_CFG; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.emptyInferenceResolution; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.testAnalyzerContext; +import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning; +import static org.elasticsearch.xpack.esql.analysis.Analyzer.ESQL_LOOKUP_JOIN_FULL_TEXT_FUNCTION; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.defaultLookupResolution; +import static org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils.indexResolutions; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +/** + * Tests for {@link LookupPhysicalPlanOptimizer}, verifying that the lookup-node planning pipeline + * (buildLocalLogicalPlan -> LookupLogicalOptimizer -> LocalMapper -> LookupPhysicalPlanOptimizer) produces correct physical plans. + */ +public class LookupPhysicalPlanOptimizerTests extends MapperServiceTestCase { + + private Analyzer analyzer; + private TestPlannerOptimizer plannerOptimizer; + + @Before + public void init() { + Configuration config = TEST_CFG; + Map mapping = loadMapping("mapping-basic.json"); + EsIndex test = EsIndexGenerator.esIndex("test", mapping, Map.of("test", IndexMode.STANDARD)); + + analyzer = new Analyzer( + testAnalyzerContext( + config, + new EsqlFunctionRegistry(), + indexResolutions(test), + defaultLookupResolution(), + new EnrichResolution(), + emptyInferenceResolution() + ), + new Verifier(new Metrics(new EsqlFunctionRegistry(), true, true), new XPackLicenseState(() -> 0L)) + ); + plannerOptimizer = new TestPlannerOptimizer(config, analyzer); + } + + @Override + protected List filteredWarnings() { + return withDefaultLimitWarning(super.filteredWarnings()); + } + + /** + * Simple lookup with no filters. + * Expects: ProjectExec -> FieldExtractExec -> ParameterizedQueryExec(query=null) + */ + public void testSimpleLookup() { + PhysicalPlan plan = optimizeLookupPlan("FROM test | LOOKUP JOIN test_lookup ON emp_no"); + + ProjectExec project = as(plan, ProjectExec.class); + FieldExtractExec fieldExtract = as(project.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(fieldExtract.child(), ParameterizedQueryExec.class); + + assertThat(paramQuery.query(), nullValue()); + assertThat(fieldExtract.attributesToExtract().isEmpty(), is(false)); + } + + /** + * WHERE clause with a pushable right-only filter (equality on a keyword field). + * The logical optimizer pushes it into the join's right side, and the lookup physical optimizer + * pushes it down to ParameterizedQueryExec.query(). + */ + public void testPushableRightOnlyFilter() { + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" + """); + + ProjectExec project = as(plan, ProjectExec.class); + FieldExtractExec fieldExtract = as(project.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(fieldExtract.child(), ParameterizedQueryExec.class); + + QueryBuilder query = paramQuery.query(); + assertNotNull("Expected filter to be pushed to ParameterizedQueryExec", query); + assertThat(query.toString(), containsString("language_name")); + } + + /** + * WHERE clause with a non-pushable right-only filter (LENGTH function comparison). + * The logical optimizer pushes it into the join's right side, but the lookup physical optimizer + * cannot push it to Lucene, so it stays as FilterExec. + */ + public void testNonPushableRightOnlyFilter() { + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE LENGTH(language_name) > 3 + """); + + ProjectExec project = as(plan, ProjectExec.class); + FilterExec filter = as(project.child(), FilterExec.class); + FieldExtractExec fieldExtract = as(filter.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(fieldExtract.child(), ParameterizedQueryExec.class); + + assertThat(paramQuery.query(), nullValue()); + assertThat(filter.condition().toString(), containsString("LENGTH")); + } + + /** + * WHERE clause with both pushable and non-pushable right-only filters. + * The pushable part goes to ParameterizedQueryExec.query(), the non-pushable stays as FilterExec. + */ + public void testMixedPushableAndNonPushableFilters() { + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" AND LENGTH(language_name) > 3 + """); + + ProjectExec project = as(plan, ProjectExec.class); + FilterExec filter = as(project.child(), FilterExec.class); + FieldExtractExec fieldExtract = as(filter.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(fieldExtract.child(), ParameterizedQueryExec.class); + + assertNotNull("Expected pushable filter on ParameterizedQueryExec", paramQuery.query()); + assertThat(paramQuery.query().toString(), containsString("language_name")); + assertThat(filter.condition().toString(), containsString("LENGTH")); + } + + /** + * ON expression with a pushable right-only filter combined with a non-pushable WHERE clause. + * The ON equality filter is pushed to ParameterizedQueryExec.query(), while LENGTH stays as FilterExec. + */ + public void testOnExpressionFilterWithWhereClause() { + assumeTrue("Requires LOOKUP JOIN on expression", EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_FULL_TEXT_FUNCTION.isEnabled()); + + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | LOOKUP JOIN languages_lookup ON languages == language_code AND language_name == "English" + | WHERE LENGTH(language_name) > 3 + """, ESQL_LOOKUP_JOIN_FULL_TEXT_FUNCTION); + + ProjectExec project = as(plan, ProjectExec.class); + FieldExtractExec extractForJoinKey = as(project.child(), FieldExtractExec.class); + + // LENGTH is not pushable, stays as FilterExec + FilterExec filter = as(extractForJoinKey.child(), FilterExec.class); + assertThat(filter.condition().toString(), containsString("LENGTH")); + + FieldExtractExec extractForFilter = as(filter.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(extractForFilter.child(), ParameterizedQueryExec.class); + + // language_name == "English" (pushable ON right-only filter) is pushed to query + assertNotNull("Expected pushable ON filter on ParameterizedQueryExec", paramQuery.query()); + assertThat(paramQuery.query().toString(), containsString("language_name")); + + // joinOnConditions is the left-right join key comparison + assertNotNull("Expected join on conditions", paramQuery.joinOnConditions()); + assertThat(paramQuery.joinOnConditions().toString(), containsString("languages")); + } + + /** + * Filter through an EvalExec that aliases a field. The filter references the alias, + * which should be resolved and pushed to ParameterizedQueryExec.query(). + * This tests the FilterExec -> EvalExec -> ParameterizedQueryExec path in PushFiltersToSource. + */ + public void testPushFilterThroughEvalExec() { + Source src = Source.EMPTY; + + FieldAttribute langName = new FieldAttribute( + src, + "language_name", + new EsField("language_name", DataType.KEYWORD, Map.of(), true, EsField.TimeSeriesFieldType.NONE) + ); + List matchFields = List.of(new MatchConfig("language_code", 0, DataType.KEYWORD)); + FieldAttribute docAttr = new FieldAttribute(src, null, null, EsQueryExec.DOC_ID_FIELD.getName(), EsQueryExec.DOC_ID_FIELD); + List pqOutput = List.of(docAttr, AbstractLookupService.LOOKUP_POSITIONS_FIELD, langName); + + ParameterizedQuery pq = new ParameterizedQuery(src, pqOutput, matchFields, null); + Alias alias = new Alias(src, "ln", langName); + Eval eval = new Eval(src, pq, List.of(alias)); + Equals condition = new Equals(src, alias.toAttribute(), Literal.keyword(src, "English"), null); + LogicalPlan filter = new org.elasticsearch.xpack.esql.plan.logical.Filter(src, eval, condition); + LogicalPlan project = new Project( + src, + filter, + List.of(AbstractLookupService.LOOKUP_POSITIONS_FIELD, langName, alias.toAttribute()) + ); + + PhysicalPlan plan = LookupFromIndexService.createLookupPhysicalPlan( + project, + TEST_CFG, + PlannerSettings.DEFAULTS, + FoldContext.small(), + TEST_SEARCH_STATS, + new EsqlFlags(true) + ); + + // The filter on 'ln' (alias for language_name) should be resolved and pushed to ParameterizedQueryExec. + ProjectExec resultProject = as(plan, ProjectExec.class); + EvalExec evalExec = as(resultProject.child(), EvalExec.class); + FieldExtractExec extract = as(evalExec.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(extract.child(), ParameterizedQueryExec.class); + + assertNotNull("Expected filter pushed through EvalExec to ParameterizedQueryExec", paramQuery.query()); + assertThat(paramQuery.query().toString(), containsString("language_name")); + } + + /** + * Filter that becomes always-true due to missing field stats should be pruned during logical optimization. + * "language_name IS NULL" with language_name missing → "null IS NULL" → true → filter removed. + * The null Eval from ReplaceFieldWithConstantOrNull should be pruned by the physical optimizer since + * the field is not needed for extraction. + */ + public void testFilterOnMissingFieldFoldedToTrue() { + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().exclude( + EsqlTestUtils.TestConfigurableSearchStats.Config.EXISTS, + "language_name" + ); + + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name IS NULL + """, stats); + + ProjectExec project = as(plan, ProjectExec.class); + EvalExec eval = as(project.child(), EvalExec.class); + ParameterizedQueryExec paramQuery = as(eval.child(), ParameterizedQueryExec.class); + assertThat("Filter should have been pruned, no query on ParameterizedQueryExec", paramQuery.query(), nullValue()); + } + + /** + * Filter on a missing field with equality (e.g. {@code language_name == "English"}) folds to {@code null == "English"} → null, + * which marks the {@link ParameterizedQueryExec} as {@code emptyResult=true} instead of collapsing the plan. + */ + public void testFilterOnMissingFieldFoldedToEmpty() { + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().exclude( + EsqlTestUtils.TestConfigurableSearchStats.Config.EXISTS, + "language_name" + ); + + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + | WHERE language_name == "English" + """, stats); + + ProjectExec project = as(plan, ProjectExec.class); + EvalExec eval = as(project.child(), EvalExec.class); + ParameterizedQueryExec paramQuery = as(eval.child(), ParameterizedQueryExec.class); + assertThat(paramQuery.emptyResult(), is(true)); + } + + /** + * When a missing field is dropped from the output, it never appears in the lookup plan's addedFields, + * so no null Eval is needed. Using expression-based join so that language_code remains as an + * extractable added field after dropping language_name. + * Expects: ProjectExec -> FieldExtractExec -> ParameterizedQueryExec + */ + public void testDropMissingFieldPrunesEval() { + assumeTrue("Requires LOOKUP JOIN on expression", EsqlCapabilities.Cap.LOOKUP_JOIN_WITH_FULL_TEXT_FUNCTION.isEnabled()); + + EsqlTestUtils.TestConfigurableSearchStats stats = new EsqlTestUtils.TestConfigurableSearchStats().exclude( + EsqlTestUtils.TestConfigurableSearchStats.Config.EXISTS, + "language_name" + ); + + PhysicalPlan plan = optimizeLookupPlan(""" + FROM test + | LOOKUP JOIN languages_lookup ON languages == language_code + | DROP language_name + """, stats, ESQL_LOOKUP_JOIN_FULL_TEXT_FUNCTION); + + ProjectExec project = as(plan, ProjectExec.class); + FieldExtractExec fieldExtract = as(project.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery = as(fieldExtract.child(), ParameterizedQueryExec.class); + assertThat(paramQuery.query(), nullValue()); + } + + /** + * Two consecutive LOOKUP JOINs: first on test_lookup (by emp_no), then on languages_lookup (by language_code). + * Each join's right side is independently planned on its respective lookup node. + */ + public void testTwoLookupJoins() { + List plans = optimizeAllLookupPlans(""" + FROM test + | LOOKUP JOIN test_lookup ON emp_no + | RENAME languages AS language_code + | LOOKUP JOIN languages_lookup ON language_code + """, null, TEST_SEARCH_STATS); + + assertThat(plans, hasSize(2)); + + // Outermost join (languages_lookup) is found first in tree traversal + PhysicalPlan languagesPlan = plans.get(0); + ProjectExec project0 = as(languagesPlan, ProjectExec.class); + FieldExtractExec extract0 = as(project0.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery0 = as(extract0.child(), ParameterizedQueryExec.class); + assertThat(paramQuery0.query(), nullValue()); + + // Inner join (test_lookup) is found second + PhysicalPlan testPlan = plans.get(1); + ProjectExec project1 = as(testPlan, ProjectExec.class); + FieldExtractExec extract1 = as(project1.child(), FieldExtractExec.class); + ParameterizedQueryExec paramQuery1 = as(extract1.child(), ParameterizedQueryExec.class); + assertThat(paramQuery1.query(), nullValue()); + } + + private PhysicalPlan optimizeLookupPlan(String esql) { + return optimizeLookupPlan(esql, TEST_SEARCH_STATS); + } + + private PhysicalPlan optimizeLookupPlan(String esql, SearchStats searchStats) { + List plans = optimizeAllLookupPlans(esql, null, searchStats); + assertThat("Expected exactly one LOOKUP JOIN", plans, hasSize(1)); + return plans.getFirst(); + } + + private PhysicalPlan optimizeLookupPlan(String esql, TransportVersion minVersion) { + return optimizeLookupPlan(esql, TEST_SEARCH_STATS, minVersion); + } + + private PhysicalPlan optimizeLookupPlan(String esql, SearchStats searchStats, TransportVersion minVersion) { + List plans = optimizeAllLookupPlans(esql, minVersion, searchStats); + assertThat("Expected exactly one LOOKUP JOIN", plans, hasSize(1)); + return plans.getFirst(); + } + + /** + * Runs the full planning pipeline and returns a lookup-node physical plan for each LookupJoinExec + * found in the data-node plan. The plans are returned in tree traversal order (outermost join first). + */ + private List optimizeAllLookupPlans(String esql, TransportVersion minVersion, SearchStats searchStats) { + PhysicalPlan dataNodePlan; + if (minVersion != null) { + MutableAnalyzerContext mutableContext = (MutableAnalyzerContext) analyzer.context(); + try ( + MutableAnalyzerContext.RestoreTransportVersion restore = mutableContext.setTemporaryTransportVersionOnOrAfter(minVersion) + ) { + dataNodePlan = plannerOptimizer.plan(esql); + } + } else { + dataNodePlan = plannerOptimizer.plan(esql); + } + + List joins = findAllLookupJoins(dataNodePlan); + assertThat("Expected at least one LookupJoinExec in the plan", joins.isEmpty(), is(false)); + + List lookupPlans = new ArrayList<>(joins.size()); + for (LookupJoinExec join : joins) { + lookupPlans.add(buildLookupPlan(join, searchStats)); + } + return lookupPlans; + } + + private static PhysicalPlan buildLookupPlan(LookupJoinExec join, SearchStats searchStats) { + List matchFields = new ArrayList<>(join.leftFields().size()); + for (int i = 0; i < join.leftFields().size(); i++) { + FieldAttribute right = (FieldAttribute) join.rightFields().get(i); + String fieldName = right.exactAttribute().fieldName().string(); + if (join.isOnJoinExpression()) { + fieldName = join.leftFields().get(i).name(); + } + matchFields.add(new MatchConfig(fieldName, i, join.leftFields().get(i).dataType())); + } + + LogicalPlan logicalPlan = LookupFromIndexService.buildLocalLogicalPlan( + join.source(), + matchFields, + join.joinOnConditions(), + join.right(), + join.addedFields().stream().map(f -> (NamedExpression) f).toList() + ); + return LookupFromIndexService.createLookupPhysicalPlan( + logicalPlan, + TEST_CFG, + PlannerSettings.DEFAULTS, + FoldContext.small(), + searchStats, + new EsqlFlags(true) + ); + } + + private static List findAllLookupJoins(PhysicalPlan plan) { + List joins = new ArrayList<>(); + collectLookupJoins(plan, joins); + return joins; + } + + private static void collectLookupJoins(PhysicalPlan plan, List joins) { + if (plan instanceof LookupJoinExec join) { + joins.add(join); + } + for (PhysicalPlan child : plan.children()) { + collectLookupJoins(child, joins); + } + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ParameterizedQuerySerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ParameterizedQuerySerializationTests.java new file mode 100644 index 0000000000000..c0f6dbad1dc04 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/ParameterizedQuerySerializationTests.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical; + +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; +import org.elasticsearch.xpack.esql.expression.function.FieldAttributeTests; + +import java.io.IOException; +import java.util.List; + +public class ParameterizedQuerySerializationTests extends AbstractLogicalPlanSerializationTests { + + public static ParameterizedQuery randomParameterizedQuery() { + List output = randomFieldAttributes(1, 5, false); + List matchFields = randomList(1, 3, ParameterizedQuerySerializationTests::randomMatchConfig); + Expression joinOnConditions = randomBoolean() ? null : FieldAttributeTests.createFieldAttribute(3, false); + return new ParameterizedQuery(randomSource(), output, matchFields, joinOnConditions); + } + + private static MatchConfig randomMatchConfig() { + return new MatchConfig(randomAlphaOfLength(5), between(0, 10), randomFrom(DataType.KEYWORD, DataType.INTEGER, DataType.LONG)); + } + + @Override + protected ParameterizedQuery createTestInstance() { + return randomParameterizedQuery(); + } + + @Override + protected ParameterizedQuery mutateInstance(ParameterizedQuery instance) throws IOException { + List output = instance.output(); + List matchFields = instance.matchFields(); + Expression joinOnConditions = instance.joinOnConditions(); + switch (between(0, 2)) { + case 0 -> output = randomValueOtherThan(output, () -> randomFieldAttributes(1, 5, false)); + case 1 -> matchFields = randomValueOtherThan( + matchFields, + () -> randomList(1, 3, ParameterizedQuerySerializationTests::randomMatchConfig) + ); + case 2 -> joinOnConditions = randomValueOtherThan( + joinOnConditions, + () -> randomBoolean() ? null : FieldAttributeTests.createFieldAttribute(3, false) + ); + default -> throw new IllegalArgumentException(); + } + return new ParameterizedQuery(instance.source(), output, matchFields, joinOnConditions); + } + + @Override + protected boolean alwaysEmptySource() { + return true; + } +} diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java index 77dc8cd6d8a11..a642bc52ec5ef 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/tree/EsqlNodeSubclassTests.java @@ -38,6 +38,7 @@ import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.type.EsField; import org.elasticsearch.xpack.esql.datasources.FileSet; +import org.elasticsearch.xpack.esql.enrich.MatchConfig; import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.UnresolvedAttributeTests; import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction; @@ -445,6 +446,9 @@ public void accept(Page page) { } else if (argClass == FileSet.class) { // FileSet is final, cannot be mocked return FileSet.UNRESOLVED; + } else if (argClass == MatchConfig.class) { + // MatchConfig is final, cannot be mocked + return new MatchConfig(randomAlphaOfLength(5), randomInt(10), randomFrom(DataType.types())); } else if (argClass == EsQueryExec.FieldSort.class) { // TODO: It appears neither FieldSort nor GeoDistanceSort are ever actually tested return randomFieldSort(); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml index 052f854def2ba..92a579c9cb39e 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/190_lookup_join.yml @@ -95,6 +95,22 @@ setup: type: keyword shade: type: keyword + - do: + indices.create: + index: test-lookup-const + body: + settings: + index: + mode: lookup + mappings: + properties: + key: + type: long + color: + type: keyword + const_tag: + type: constant_keyword + value: "alpha" - do: indices.create: index: test2 @@ -167,6 +183,15 @@ setup: - { "key": 20, "color": "blue" , shade: "dark" } - { "index": { } } - { "key": 30, "color": "pink" , shade: "hot" } + - do: + bulk: + index: "test-lookup-const" + refresh: true + body: + - { "index": { } } + - { "key": 1, "color": "cyan" } + - { "index": { } } + - { "key": 2, "color": "yellow" } - do: bulk: index: "test2" @@ -344,3 +369,48 @@ basic join on two fields: #for 20 and yellow, no rows match, but we keep the row as it is a lookup join - match: { values.2: [ 20, "yellow", null ] } +--- +constant field filter matches: + - requires: + capabilities: + - method: POST + path: /_query + parameters: [ ] + capabilities: [ join_lookup_v12 ] + reason: "uses LOOKUP JOIN" + - do: + esql.query: + body: + query: 'FROM test | LOOKUP JOIN test-lookup-const ON key | WHERE const_tag == "alpha" | SORT key | LIMIT 10' + + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { columns.2.name: "const_tag" } + - match: { columns.2.type: "keyword" } + - match: { values.0: [ 1, "cyan", "alpha" ] } + - match: { values.1: [ 2, "yellow", "alpha" ] } + +--- +constant field filter does not match: + - requires: + capabilities: + - method: POST + path: /_query + parameters: [ ] + capabilities: [ join_lookup_v12 ] + reason: "uses LOOKUP JOIN" + - do: + esql.query: + body: + query: 'FROM test | LOOKUP JOIN test-lookup-const ON key | WHERE const_tag == "beta" | SORT key | LIMIT 10' + + - match: { columns.0.name: "key" } + - match: { columns.0.type: "long" } + - match: { columns.1.name: "color" } + - match: { columns.1.type: "keyword" } + - match: { columns.2.name: "const_tag" } + - match: { columns.2.type: "keyword" } + - length: { values: 0 } +