diff --git a/docs/changelog/143030.yaml b/docs/changelog/143030.yaml new file mode 100644 index 0000000000000..5f10a00ac92a7 --- /dev/null +++ b/docs/changelog/143030.yaml @@ -0,0 +1,6 @@ +pr: 143030 +summary: "ESQL: Fix incorrectly optimized fork with nullify unmapped_fields" +area: ES|QL +type: bug +issues: + - 142762 diff --git a/muted-tests.yml b/muted-tests.yml index 2651530cb521d..41a4731618891 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -321,9 +321,6 @@ tests: - class: org.elasticsearch.xpack.esql.session.EsqlResolvedIndexExpressionIT method: testLocalDateMathExpression issue: https://github.com/elastic/elasticsearch/issues/140106 -- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT - method: test {csv-spec:unmapped-nullify.TsWithAggsByMissing} - issue: https://github.com/elastic/elasticsearch/issues/142762 - class: org.elasticsearch.xpack.inference.integration.AuthorizationTaskExecutorIT method: testCreatesEisChatCompletionEndpoint issue: https://github.com/elastic/elasticsearch/issues/140849 diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped-nullify.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped-nullify.csv-spec index 54bf63844670d..bda3cbe4fb258 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped-nullify.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped-nullify.csv-spec @@ -600,3 +600,88 @@ TS "k8s" | STATS max_bytes=max(to_long(network.total_bytes_in)) BY cluster = doe max_bytes:long | cluster:null 10797 | null ; + + +forkWithRowAndUnmappedKeep +required_capability: fork_v9 +required_capability: fix_fork_unmapped_nullify + +SET unmapped_fields="nullify"\; +ROW a = 1 +| FORK (where true) +| WHERE a == 1 +| KEEP bar +; + +bar:null +null +; + + +forkWithFromAndUnmappedCoalesce +required_capability: fork_v9 +required_capability: fix_fork_unmapped_nullify + +SET unmapped_fields="nullify"\; +FROM employees +| FORK (where foo != 84) (where true) +| WHERE _fork == "fork1" +| DROP _fork +| EVAL y = coalesce(bar, baz) +; + +avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean | foo:null | bar:null | baz:null | y:null +; + + +forkWithUnmappedStatsEvalKeep +required_capability: fork_v9 +required_capability: fix_fork_unmapped_nullify + +SET unmapped_fields="nullify"\; +FROM alerts +| KEEP kibana.alert.risk_score +| FORK (WHERE true | MV_EXPAND kibana.alert.risk_score) +| STATS kibana.alert.risk_score = COUNT(*) +| EVAL x = LEAST(kibana.alert.risk_score, 52, 60) +| KEEP kibana.alert.risk_score +; + +kibana.alert.risk_score:long +10 +; + + +forkWithUnmappedStatsEvalKeepTwoBranches +required_capability: fork_v9 +required_capability: fix_fork_unmapped_nullify +required_capability: sample_v3 + +SET unmapped_fields="nullify"\; +FROM alerts +| KEEP kibana.alert.risk_score +| FORK (WHERE true | MV_EXPAND kibana.alert.risk_score) (WHERE true | SAMPLE 0.5) +| STATS kibana.alert.risk_score = COUNT(*) +| EVAL x = LEAST(kibana.alert.risk_score, 52, 60) +| KEEP kibana.alert.risk_score +; + +kibana.alert.risk_score:long +10..20 +; + + +forkWithRowCoalesceAndDrop +required_capability: fork_v9 +required_capability: fix_fork_unmapped_nullify + +SET unmapped_fields="nullify"\; +ROW a = 12::long +| FORK (WHERE true) +| EVAL x = COALESCE(a, 5) +| DROP a +; + +_fork:keyword | x:long +fork1 | 12 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index d90dc642d6cd7..530203556211a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1913,6 +1913,15 @@ public enum Cap { */ MATCH_FUNCTION_ZERO_TERMS_QUERY, + /** + * Fixes an analysis bug in {@code FORK} with {@code unmapped_fields="nullify"}. + * Preserve existing attribute {@code NameId}s so that references from upper plan nodes remain valid after + * sub-plans are updated. Only genuinely new attributes get fresh NameIds. + * Keeping the same attributes can have unintended side effects when applying optimizations like constant folding. + * https://github.com/elastic/elasticsearch/issues/142762 + */ + FIX_FORK_UNMAPPED_NULLIFY, + // Last capability should still have a comma for fewer merge conflicts when adding new ones :) // This comment prevents the semicolon from being on the previous capability when Spotless formats the file. ; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 91913d079fa17..f894716dd41e9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -179,7 +179,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE; import static org.elasticsearch.xpack.esql.capabilities.TranslationAware.translatable; -import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes; +import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributesPreservingIds; import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN; import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; @@ -971,7 +971,7 @@ private LogicalPlan resolveFork(Fork fork) { return fork; } - return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributes(outputUnion)); + return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributesPreservingIds(outputUnion, fork.output())); } private LogicalPlan resolveRerank(Rerank rerank, List childrenOutput) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java index eb86ba0e2f972..4b7f9208c320d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/core/expression/Expressions.java @@ -11,7 +11,9 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Predicate; import static java.util.Collections.emptyList; @@ -32,19 +34,27 @@ public static List asAttributes(List named } /** - * @return a list of {@link ReferenceAttribute}s corresponding to the given named expressions. - *

- * The returned ReferenceAttributes will have new {@link NameId}s, also in the case the input contains ReferenceAttributes. + * Converts named expressions to {@link ReferenceAttribute}s, preserving {@link NameId}s for attributes whose name + * matches one in {@code existingOutput}. Genuinely new attributes get fresh NameIds. */ - public static List toReferenceAttributes(List named) { + public static List toReferenceAttributesPreservingIds( + List named, + List existingOutput + ) { if (named.isEmpty()) { return emptyList(); } + Map existingByName = HashMap.newHashMap(existingOutput.size()); + for (Attribute attr : existingOutput) { + existingByName.put(attr.name(), attr); + } List list = new ArrayList<>(named.size()); for (NamedExpression exp : named) { + Attribute existing = existingByName.get(exp.name()); + NameId id = existing != null ? existing.id() : new NameId(); ReferenceAttribute refAttr = exp instanceof ReferenceAttribute ra - ? (ReferenceAttribute) ra.withId(new NameId()) - : new ReferenceAttribute(exp.source(), null, exp.name(), exp.dataType(), exp.nullable(), null, exp.synthetic()); + ? (ReferenceAttribute) ra.withId(id) + : new ReferenceAttribute(exp.source(), null, exp.name(), exp.dataType(), exp.nullable(), id, exp.synthetic()); list.add(refAttr); } return list; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java index 3b82d8a74d8f2..cce5454e8b9cc 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java @@ -30,7 +30,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS; -import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes; +import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributesPreservingIds; /** * A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g. @@ -105,13 +105,11 @@ public Fork replaceSubPlansAndOutput(List subPlans, List } public Fork refreshOutput() { - // We don't want to keep the same attributes that are outputted by the FORK branches. - // Keeping the same attributes can have unintended side effects when applying optimizations like constant folding. return new Fork(source(), children(), refreshedOutput()); } protected List refreshedOutput() { - return toReferenceAttributes(outputUnion(children())); + return toReferenceAttributesPreservingIds(outputUnion(children()), this.output()); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerUnmappedTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerUnmappedTests.java index c3e69e341fd98..af261c104a4db 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerUnmappedTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerUnmappedTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToInteger; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToLong; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToString; +import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce; import org.elasticsearch.xpack.esql.expression.predicate.logical.And; import org.elasticsearch.xpack.esql.expression.predicate.logical.Or; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Add; @@ -48,6 +49,7 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Row; +import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.Subquery; import org.elasticsearch.xpack.esql.plan.logical.UnionAll; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; @@ -3641,6 +3643,390 @@ public void testChangedTimestmapFieldWithRate() { """), "3:13: [rate(network.total_cost)] " + UnresolvedTimestamp.UNRESOLVED_SUFFIX); } + /** + * Expects + *

{@code
+     * Limit[1000[INTEGER],false,false]
+     * \_Project[[bar{r}#14]]
+     *   \_Filter[a{r}#9 == 1[INTEGER]]
+     *     \_Fork[[a{r}#9, _fork{r}#10, bar{r}#14]]
+     *       \_Limit[1000[INTEGER],false,false]
+     *         \_Project[[a{r}#4, _fork{r}#5, bar{r}#11]]
+     *           \_Eval[[fork1[KEYWORD] AS _fork#5]]
+     *             \_Filter[true[BOOLEAN]]
+     *               \_Eval[[null[NULL] AS bar#11]]
+     *                 \_Row[[1[INTEGER] AS a#4]]
+     * }
+ */ + public void testForkWithRow() { + var plan = analyzeStatement(setUnmappedNullify(""" + ROW a = 1 + | FORK (where true) + | WHERE a == 1 + | KEEP bar + """)); + + // Top implicit limit + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(FoldContext.small()), is(1000)); + + // KEEP bar + var project = as(limit.child(), Project.class); + assertThat(Expressions.names(project.projections()), is(List.of("bar"))); + + // WHERE a == 1 + var filter = as(project.child(), Filter.class); + + // Fork node with one branch + var fork = as(filter.child(), Fork.class); + assertThat(fork.children(), hasSize(1)); + + // Branch 0: Limit -> Project -> Eval(_fork) -> Filter(true) -> Eval(bar=null) -> Row + var b0Limit = as(fork.children().getFirst(), Limit.class); + var b0Project = as(b0Limit.child(), Project.class); + assertThat(b0Project.projections(), hasSize(3)); + assertThat(Expressions.names(b0Project.projections()), hasItems("a", "_fork", "bar")); + + var b0EvalFork = as(b0Project.child(), Eval.class); + var b0ForkAlias = as(b0EvalFork.fields().getFirst(), Alias.class); + assertThat(b0ForkAlias.name(), is("_fork")); + + var b0FilterTrue = as(b0EvalFork.child(), Filter.class); + + var b0EvalBar = as(b0FilterTrue.child(), Eval.class); + assertThat(b0EvalBar.fields(), hasSize(1)); + var barAlias = as(b0EvalBar.fields().getFirst(), Alias.class); + assertThat(barAlias.name(), is("bar")); + assertThat(as(barAlias.child(), Literal.class).dataType(), is(DataType.NULL)); + + as(b0EvalBar.child(), Row.class); + } + + /** + * Expects + *
{@code
+     * Limit[1000[INTEGER],false,false]
+     * \_Eval[[COALESCE(bar{r}#59,baz{r}#60) AS y#11]]
+     *   \_Project[[_meta_field{r}#38, emp_no{r}#39, first_name{r}#40, gender{r}#41, hire_date{r}#42, job{r}#43, job.raw{r}#44, l
+     * anguages{r}#45, last_name{r}#46, long_noidx{r}#47, salary{r}#48, foo{r}#49, bar{r}#59, baz{r}#60]]
+     *     \_Filter[_fork{r}#50 == fork1[KEYWORD]]
+     *       \_Fork[[_meta_field{r}#38, emp_no{r}#39, first_name{r}#40, gender{r}#41, hire_date{r}#42, job{r}#43, job.raw{r}#44, l
+     * anguages{r}#45, last_name{r}#46, long_noidx{r}#47, salary{r}#48, foo{r}#49, _fork{r}#50, bar{r}#59, baz{r}#60]]
+     *         |_Limit[1000[INTEGER],false,false]
+     *         | \_Project[[_meta_field{f}#19, emp_no{f}#13, first_name{f}#14, gender{f}#15, hire_date{f}#20, job{f}#21, job.raw{f}#22, l
+     * anguages{f}#16, last_name{f}#17, long_noidx{f}#23, salary{f}#18, foo{f}#35, _fork{r}#4, bar{f}#51, baz{f}#52]]
+     *         |   \_Eval[[fork1[KEYWORD] AS _fork#4]]
+     *         |     \_Filter[NOT(foo{f}#35 == 84[INTEGER])]
+     *         |       \_EsRelation[test][_meta_field{f}#19, emp_no{f}#13, first_name{f}#14, .., foo{f}#35, bar{f}#51, baz{f}#52]
+     *         \_Limit[1000[INTEGER],false,false]
+     *           \_Project[[_meta_field{f}#30, emp_no{f}#24, first_name{f}#25, gender{f}#26, hire_date{f}#31, job{f}#32, job.raw{f}#33, l
+     * anguages{f}#27, last_name{f}#28, long_noidx{f}#34, salary{f}#29, foo{r}#37, _fork{r}#5, bar{f}#53, baz{f}#54]]
+     *             \_Eval[[null[NULL] AS foo#37]]
+     *               \_Eval[[fork2[KEYWORD] AS _fork#5]]
+     *                 \_Filter[true[BOOLEAN]]
+     *                   \_EsRelation[test][_meta_field{f}#30, emp_no{f}#24, first_name{f}#25, .., bar{f}#53, baz{f}#54]
+     * }
+ */ + public void testForkWithFrom() { + var plan = analyzeStatement(setUnmappedNullify(""" + from test + | FORK (where foo != 84) (where true) + | WHERE _fork == "fork1" + | DROP _fork + | eval y = coalesce(bar, baz) + """)); + + // Top implicit limit + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(FoldContext.small()), is(1000)); + + // EVAL y = coalesce(bar, baz) + var evalY = as(limit.child(), Eval.class); + assertThat(evalY.fields(), hasSize(1)); + assertThat(evalY.fields().getFirst().name(), is("y")); + assertThat(as(evalY.fields().getFirst(), Alias.class).dataType(), is(DataType.NULL)); + + // DROP _fork -> Project without _fork + var dropProject = as(evalY.child(), Project.class); + assertThat(Expressions.names(dropProject.projections()), not(hasItems("_fork"))); + assertThat(Expressions.names(dropProject.projections()), hasItems("foo", "bar", "baz")); + + // WHERE _fork == "fork1" + var filterFork = as(dropProject.child(), Filter.class); + + // Fork node with two branches + var fork = as(filterFork.child(), Fork.class); + assertThat(fork.children(), hasSize(2)); + + // Branch 0: (where foo != 84) -> unmapped foo/bar/baz added as MissingEsField in EsRelation + var b0Limit = as(fork.children().getFirst(), Limit.class); + var b0Project = as(b0Limit.child(), Project.class); + assertThat(Expressions.names(b0Project.projections()), hasItems("foo", "_fork", "bar", "baz")); + + var b0EvalFork = as(b0Project.child(), Eval.class); + var b0ForkAlias = as(b0EvalFork.fields().getFirst(), Alias.class); + assertThat(b0ForkAlias.name(), is("_fork")); + + // Filter foo != 84 + var b0Filter = as(b0EvalFork.child(), Filter.class); + + // foo, bar, baz are FieldAttributes with MissingEsField/DataType.NULL directly in EsRelation + var b0EsRelation = as(b0Filter.child(), EsRelation.class); + assertThat(Expressions.names(b0EsRelation.output()), hasItems("foo", "bar", "baz")); + + // Branch 1: (where true) -> bar/baz added as MissingEsField in EsRelation, foo null-aliased via Eval + var b1Limit = as(fork.children().get(1), Limit.class); + var b1Project = as(b1Limit.child(), Project.class); + assertThat(Expressions.names(b1Project.projections()), hasItems("foo", "_fork", "bar", "baz")); + + // foo is not referenced in this branch's filter, so it's introduced as null-Eval by the Fork patching mechanism + var b1EvalFoo = as(b1Project.child(), Eval.class); + assertThat(Expressions.names(b1EvalFoo.fields()), hasItems("foo")); + + var b1EvalFork = as(b1EvalFoo.child(), Eval.class); + var b1ForkAlias = as(b1EvalFork.fields().getFirst(), Alias.class); + assertThat(b1ForkAlias.name(), is("_fork")); + + var b1FilterTrue = as(b1EvalFork.child(), Filter.class); + + // bar, baz are FieldAttributes with MissingEsField/DataType.NULL directly in EsRelation + var b1EsRelation = as(b1FilterTrue.child(), EsRelation.class); + assertThat(Expressions.names(b1EsRelation.output()), hasItems("bar", "baz")); + } + + /** + * Expects + *
{@code
+     * Limit[1000[INTEGER],false,false]
+     * \_Project[[emp_no{r}#8]]
+     *   \_Eval[[LEAST(emp_no{r}#8,TOLONG(52[INTEGER]),TOLONG(60[INTEGER])) AS x#11]]
+     *     \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN],PT0S[TIME_DURATION]) AS emp_no#8]]
+     *       \_Fork[[emp_no{r}#26, _fork{r}#27]]
+     *         \_Limit[1000[INTEGER],false,false]
+     *           \_Project[[emp_no{r}#25, _fork{r}#5]]
+     *             \_Eval[[fork1[KEYWORD] AS _fork#5]]
+     *               \_MvExpand[emp_no{f}#14,emp_no{r}#25]
+     *                 \_Filter[true[BOOLEAN]]
+     *                   \_Project[[emp_no{f}#14]]
+     *                     \_EsRelation[test][_meta_field{f}#20, emp_no{f}#14, first_name{f}#15, ..]
+     * }
+ */ + public void testForkWithUnmappedStatsEvalKeep() { + var plan = analyzeStatement(setUnmappedNullify(""" + from test + | keep emp_no + | FORK (where true | mv_expand emp_no) + | stats emp_no = count(*) + | eval x = least(emp_no, 52, 60) + | keep emp_no + """)); + + // Limit[1000] + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(FoldContext.small()), is(1000)); + + // KEEP emp_no → Project[[emp_no]] + var project = as(limit.child(), Project.class); + assertThat(Expressions.names(project.projections()), is(List.of("emp_no"))); + + // EVAL x = LEAST(emp_no, 52, 60) + var evalX = as(project.child(), Eval.class); + assertThat(evalX.fields(), hasSize(1)); + assertThat(evalX.fields().getFirst().name(), is("x")); + + // STATS emp_no = COUNT(*) + var agg = as(evalX.child(), Aggregate.class); + assertThat(agg.groupings(), hasSize(0)); + assertThat(agg.aggregates(), hasSize(1)); + assertThat(agg.aggregates().getFirst().name(), is("emp_no")); + as(as(agg.aggregates().getFirst(), Alias.class).child(), Count.class); + + // Fork with one branch + var fork = as(agg.child(), Fork.class); + assertThat(fork.children(), hasSize(1)); + assertThat(Expressions.names(fork.output()), hasItems("emp_no", "_fork")); + + // Branch 0: Limit → Project → Eval[_fork] → MvExpand[emp_no] → Filter[true] → Project[[emp_no]] → EsRelation + var b0Limit = as(fork.children().getFirst(), Limit.class); + var b0Project = as(b0Limit.child(), Project.class); + assertThat(Expressions.names(b0Project.projections()), hasItems("emp_no", "_fork")); + + var b0EvalFork = as(b0Project.child(), Eval.class); + assertThat(b0EvalFork.fields().getFirst().name(), is("_fork")); + + var b0MvExpand = as(b0EvalFork.child(), MvExpand.class); + assertThat(b0MvExpand.target().name(), is("emp_no")); + + var b0Filter = as(b0MvExpand.child(), Filter.class); + assertThat(as(b0Filter.condition(), Literal.class).value(), is(true)); + + // Inner Project narrowing to emp_no (emp_no is mapped, so no null-Eval) + var b0InnerProject = as(b0Filter.child(), Project.class); + assertThat(Expressions.names(b0InnerProject.projections()), is(List.of("emp_no"))); + + as(b0InnerProject.child(), EsRelation.class); + } + + /** + * Expects + *
{@code
+     * Limit[1000[INTEGER],false,false]
+     * \_Project[[emp_no{r}#8]]
+     *   \_Eval[[LEAST(emp_no{r}#8,TOLONG(52[INTEGER]),TOLONG(60[INTEGER])) AS x#11]]
+     *     \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN],PT0S[TIME_DURATION]) AS emp_no#8]]
+     *       \_Fork[[emp_no{r}#37, _fork{r}#38]]
+     *         |_Limit[1000[INTEGER],false,false]
+     *         | \_Project[[emp_no{r}#36, _fork{r}#5]]
+     *         |   \_Eval[[fork1[KEYWORD] AS _fork#5]]
+     *         |     \_MvExpand[emp_no{f}#14,emp_no{r}#36]
+     *         |       \_Filter[true[BOOLEAN]]
+     *         |         \_Project[[emp_no{f}#14]]
+     *         |           \_EsRelation[test][_meta_field{f}#20, emp_no{f}#14, first_name{f}#15, ..]
+     *         \_Limit[1000[INTEGER],false,false]
+     *           \_Project[[emp_no{f}#25, _fork{r}#5]]
+     *             \_Eval[[fork2[KEYWORD] AS _fork#5]]
+     *               \_Sample[0.5[DOUBLE]]
+     *                 \_Filter[true[BOOLEAN]]
+     *                   \_Project[[emp_no{f}#25]]
+     *                     \_EsRelation[test][_meta_field{f}#31, emp_no{f}#25, first_name{f}#26, ..]
+     * }
+ */ + public void testForkWithUnmappedStatsEvalKeepTwoBranches() { + assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE_V3.isEnabled()); + + var plan = analyzeStatement(setUnmappedNullify(""" + from test + | keep emp_no + | FORK (where true | mv_expand emp_no) (where true | SAMPLE 0.5) + | stats emp_no = count(*) + | eval x = least(emp_no, 52, 60) + | keep emp_no + """)); + + // Limit[1000] + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(FoldContext.small()), is(1000)); + + // KEEP emp_no → Project[[emp_no]] + var project = as(limit.child(), Project.class); + assertThat(Expressions.names(project.projections()), is(List.of("emp_no"))); + + // EVAL x = LEAST(emp_no, 52, 60) + var evalX = as(project.child(), Eval.class); + assertThat(evalX.fields(), hasSize(1)); + assertThat(evalX.fields().getFirst().name(), is("x")); + + // STATS emp_no = COUNT(*) — no groupings, single aggregate + var agg = as(evalX.child(), Aggregate.class); + assertThat(agg.groupings(), hasSize(0)); + assertThat(agg.aggregates(), hasSize(1)); + assertThat(agg.aggregates().getFirst().name(), is("emp_no")); + as(as(agg.aggregates().getFirst(), Alias.class).child(), Count.class); + + // Fork with two branches; output carries emp_no and _fork + var fork = as(agg.child(), Fork.class); + assertThat(fork.children(), hasSize(2)); + assertThat(Expressions.names(fork.output()), hasItems("emp_no", "_fork")); + + // Branch 0: Limit → Project → Eval[_fork] → MvExpand[emp_no] → Filter[true] → Project[[emp_no]] → EsRelation + // emp_no is a mapped field, so there is no null-Eval before EsRelation + var b0Limit = as(fork.children().get(0), Limit.class); + var b0Project = as(b0Limit.child(), Project.class); + assertThat(Expressions.names(b0Project.projections()), hasItems("emp_no", "_fork")); + + var b0EvalFork = as(b0Project.child(), Eval.class); + assertThat(b0EvalFork.fields().getFirst().name(), is("_fork")); + + var b0MvExpand = as(b0EvalFork.child(), MvExpand.class); + assertThat(b0MvExpand.target().name(), is("emp_no")); + + var b0Filter = as(b0MvExpand.child(), Filter.class); + assertThat(as(b0Filter.condition(), Literal.class).value(), is(true)); + + var b0InnerProject = as(b0Filter.child(), Project.class); + assertThat(Expressions.names(b0InnerProject.projections()), is(List.of("emp_no"))); + as(b0InnerProject.child(), EsRelation.class); + + // Branch 1: Limit → Project → Eval[_fork] → Sample[0.5] → Filter[true] → Project[[emp_no]] → EsRelation + // emp_no is a mapped field, so there is no null-Eval before EsRelation + var b1Limit = as(fork.children().get(1), Limit.class); + var b1Project = as(b1Limit.child(), Project.class); + assertThat(Expressions.names(b1Project.projections()), hasItems("emp_no", "_fork")); + + var b1EvalFork = as(b1Project.child(), Eval.class); + assertThat(b1EvalFork.fields().getFirst().name(), is("_fork")); + + var b1Sample = as(b1EvalFork.child(), Sample.class); + assertThat(as(b1Sample.probability(), Literal.class).value(), is(0.5)); + + var b1Filter = as(b1Sample.child(), Filter.class); + assertThat(as(b1Filter.condition(), Literal.class).value(), is(true)); + + var b1InnerProject = as(b1Filter.child(), Project.class); + assertThat(Expressions.names(b1InnerProject.projections()), is(List.of("emp_no"))); + as(b1InnerProject.child(), EsRelation.class); + } + + /** + * Expects + *
{@code
+     * Limit[1000[INTEGER],false,false]
+     * \_Project[[_fork{r}#12, x{r}#8]]
+     *   \_Eval[[COALESCE(a{r}#11,TOLONG(5[INTEGER])) AS x#8]]
+     *     \_Fork[[a{r}#11, _fork{r}#12]]
+     *       \_Limit[1000[INTEGER],false,false]
+     *         \_Project[[a{r}#4, _fork{r}#5]]
+     *           \_Eval[[fork1[KEYWORD] AS _fork#5]]
+     *             \_Filter[true[BOOLEAN]]
+     *               \_Row[[TOLONG(12[INTEGER]) AS a#4]]
+     * }
+ */ + public void testForkWithRowCoalesceAndDrop() { + var plan = analyzeStatement(setUnmappedNullify(""" + ROW a = 12::long + | fork (where true) + | eval x = Coalesce(a, 5) + | drop a + """)); + + // Limit[1000] + var limit = as(plan, Limit.class); + assertThat(limit.limit().fold(FoldContext.small()), is(1000)); + + // DROP a → Project[[_fork, x]] (a is excluded) + var project = as(limit.child(), Project.class); + assertThat(Expressions.names(project.projections()), is(List.of("_fork", "x"))); + assertThat(Expressions.names(project.projections()), not(hasItems("a"))); + + // EVAL x = COALESCE(a, TOLONG(5)) + var evalX = as(project.child(), Eval.class); + assertThat(evalX.fields(), hasSize(1)); + var xAlias = as(evalX.fields().getFirst(), Alias.class); + assertThat(xAlias.name(), is("x")); + as(xAlias.child(), Coalesce.class); + + // Fork with one branch + var fork = as(evalX.child(), Fork.class); + assertThat(fork.children(), hasSize(1)); + assertThat(Expressions.names(fork.output()), hasItems("a", "_fork")); + + // Branch 0: Limit → Project[a, _fork] → Eval[_fork] → Filter[true] → Row[a = TOLONG(12)] + var b0Limit = as(fork.children().getFirst(), Limit.class); + + var b0Project = as(b0Limit.child(), Project.class); + assertThat(Expressions.names(b0Project.projections()), hasItems("a", "_fork")); + + var b0EvalFork = as(b0Project.child(), Eval.class); + assertThat(b0EvalFork.fields().getFirst().name(), is("_fork")); + + var b0Filter = as(b0EvalFork.child(), Filter.class); + assertThat(as(b0Filter.condition(), Literal.class).value(), is(true)); + + var row = as(b0Filter.child(), Row.class); + assertThat(row.fields(), hasSize(1)); + assertThat(row.fields().getFirst().name(), is("a")); + } + private void verificationFailure(String statement, String expectedFailure) { var e = expectThrows(VerificationException.class, () -> analyzeStatement(statement)); assertThat(e.getMessage(), containsString(expectedFailure));