Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/143030.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 143030
summary: "ESQL: Fix incorrectly optimized fork with nullify unmapped_fields"
area: ES|QL
type: bug
issues:
- 142762
Original file line number Diff line number Diff line change
Expand Up @@ -678,3 +678,88 @@ FROM languages
host.entity.id:null | entity.id:keyword
null | foo
;


forkWithRowAndUnmappedKeep
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
ROW a = 1
| FORK (where true)
| WHERE a == 1
| KEEP bar
;

bar:null
null
;


forkWithFromAndUnmappedCoalesce
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
FROM employees
| FORK (where foo != 84) (where true)
| WHERE _fork == "fork1"
| DROP _fork
| EVAL y = coalesce(bar, baz)
;

avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean | foo:null | bar:null | baz:null | y:null
;


forkWithUnmappedStatsEvalKeep
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
FROM alerts
| KEEP kibana.alert.risk_score
| FORK (WHERE true | MV_EXPAND kibana.alert.risk_score)
| STATS kibana.alert.risk_score = COUNT(*)
| EVAL x = LEAST(kibana.alert.risk_score, 52, 60)
| KEEP kibana.alert.risk_score
;

kibana.alert.risk_score:long
10
;


forkWithUnmappedStatsEvalKeepTwoBranches
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify
required_capability: sample_v3

SET unmapped_fields="nullify"\;
FROM alerts
| KEEP kibana.alert.risk_score
| FORK (WHERE true | MV_EXPAND kibana.alert.risk_score) (WHERE true | SAMPLE 0.5)
| STATS kibana.alert.risk_score = COUNT(*)
| EVAL x = LEAST(kibana.alert.risk_score, 52, 60)
| KEEP kibana.alert.risk_score
;

kibana.alert.risk_score:long
10..20
;


forkWithRowCoalesceAndDrop
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
ROW a = 12::long
| FORK (WHERE true)
| EVAL x = COALESCE(a, 5)
| DROP a
;

_fork:keyword | x:long
fork1 | 12
;
Original file line number Diff line number Diff line change
Expand Up @@ -1940,6 +1940,15 @@ public enum Cap {
*/
FIX_PASSTHROUGH_FIELD_CAPS_OBJECT_PARENT,

/**
* Fixes an analysis bug in {@code FORK} with {@code unmapped_fields="nullify"}.
* Preserve existing attribute {@code NameId}s so that references from upper plan nodes remain valid after
* sub-plans are updated. Only genuinely new attributes get fresh NameIds.
* Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
* https://github.com/elastic/elasticsearch/issues/142762
*/
FIX_FORK_UNMAPPED_NULLIFY,

// Last capability should still have a comma for fewer merge conflicts when adding new ones :)
// This comment prevents the semicolon from being on the previous capability when Spotless formats the file.
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.esql.capabilities.TranslationAware.translatable;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributesPreservingIds;
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
Expand Down Expand Up @@ -971,7 +971,7 @@ private LogicalPlan resolveFork(Fork fork) {
return fork;
}

return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributes(outputUnion));
return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributesPreservingIds(outputUnion, fork.output()));
}

private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import static java.util.Collections.emptyList;
Expand All @@ -32,19 +34,27 @@ public static List<Attribute> asAttributes(List<? extends NamedExpression> named
}

/**
* @return a list of {@link ReferenceAttribute}s corresponding to the given named expressions.
* <p>
* The returned ReferenceAttributes will have new {@link NameId}s, also in the case the input contains ReferenceAttributes.
* Converts named expressions to {@link ReferenceAttribute}s, preserving {@link NameId}s for attributes whose name
* matches one in {@code existingOutput}. Genuinely new attributes get fresh NameIds.
*/
public static List<Attribute> toReferenceAttributes(List<? extends NamedExpression> named) {
public static List<Attribute> toReferenceAttributesPreservingIds(
List<? extends NamedExpression> named,
List<Attribute> existingOutput
) {
if (named.isEmpty()) {
return emptyList();
}
Map<String, Attribute> existingByName = HashMap.newHashMap(existingOutput.size());
for (Attribute attr : existingOutput) {
existingByName.put(attr.name(), attr);
}
List<Attribute> list = new ArrayList<>(named.size());
for (NamedExpression exp : named) {
Attribute existing = existingByName.get(exp.name());
NameId id = existing != null ? existing.id() : new NameId();
ReferenceAttribute refAttr = exp instanceof ReferenceAttribute ra
? (ReferenceAttribute) ra.withId(new NameId())
: new ReferenceAttribute(exp.source(), null, exp.name(), exp.dataType(), exp.nullable(), null, exp.synthetic());
? (ReferenceAttribute) ra.withId(id)
: new ReferenceAttribute(exp.source(), null, exp.name(), exp.dataType(), exp.nullable(), id, exp.synthetic());
list.add(refAttr);
}
return list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributesPreservingIds;

/**
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
Expand Down Expand Up @@ -105,13 +105,11 @@ public Fork replaceSubPlansAndOutput(List<LogicalPlan> subPlans, List<Attribute>
}

public Fork refreshOutput() {
// We don't want to keep the same attributes that are outputted by the FORK branches.
// Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
return new Fork(source(), children(), refreshedOutput());
}

protected List<Attribute> refreshedOutput() {
return toReferenceAttributes(outputUnion(children()));
return toReferenceAttributesPreservingIds(outputUnion(children()), this.output());
}

@Override
Expand Down
Loading
Loading