Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/143030.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 143030
summary: "ESQL: Fix incorrectly optimized fork with nullify unmapped_fields"
area: ES|QL
type: bug
issues:
- 142762
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,6 @@ tests:
- class: org.elasticsearch.xpack.sql.qa.security.CliApiKeyIT
method: testCliConnectionWithInvalidApiKey
issue: https://github.com/elastic/elasticsearch/issues/143646
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
method: test {csv-spec:unmapped-nullify.TsWithAggsByMissing}
issue: https://github.com/elastic/elasticsearch/issues/142762
- class: org.elasticsearch.compute.lucene.query.LuceneTopNSourceOperatorScoringTests
method: testAccumulateSearchLoad
issue: https://github.com/elastic/elasticsearch/issues/143750
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ public abstract class GenerativeRestTest extends ESRestTestCase implements Query
"found value \\[.*\\] type \\[unsupported\\]", // https://github.com/elastic/elasticsearch/issues/142761
"change point value \\[.*\\] must be numeric", // https://github.com/elastic/elasticsearch/issues/142858
"illegal query_string option \\[boost\\]", // https://github.com/elastic/elasticsearch/issues/142758
// https://github.com/elastic/elasticsearch/issues/142543
"Column \\[.*\\] has conflicting data types in FORK branches: \\[NULL\\] and \\[.*\\]",
"Column \\[.*\\] has conflicting data types in FORK branches: \\[.*\\] and \\[NULL\\]",
"Field \\[.*\\] of type \\[.*\\] does not support match.* queries",
"JOIN left field \\[.*\\] of type \\[NULL\\] is incompatible with right", // https://github.com/elastic/elasticsearch/issues/141827
// https://github.com/elastic/elasticsearch/issues/141827
Expand Down Expand Up @@ -312,7 +309,6 @@ private record FailureContext(
},
ctx -> isUnmappedFieldError(ctx.errorMessage, ctx.query),
ctx -> isScalarTypeMismatchError(ctx.errorMessage),
ctx -> isForkOptimizationBugWithUnmappedFields(ctx.errorMessage, ctx.query),
ctx -> isFieldFullTextError(ctx.errorMessage, ctx.query, ctx.previousCommands, ctx.currentSchema),
ctx -> isFullTextAfterSampleBug(ctx.errorMessage, ctx.query),
ctx -> isFullTextAfterWhereBugs(ctx.errorMessage),
Expand Down Expand Up @@ -447,20 +443,6 @@ private static boolean isScalarTypeMismatchError(String errorMessage) {
return SCALAR_TYPE_MISMATCH_PATTERN.matcher(errorWithoutLineBreaks).matches();
}

private static final Pattern FORK_OPTIMIZED_INCORRECTLY_PATTERN = Pattern.compile(
".*Plan \\[.*\\] optimized incorrectly due to missing references \\[_fork.*",
Pattern.DOTALL
);

/**
* When {@code SET unmapped_fields="nullify"} is used, the _fork reference can go missing during plan optimization.
* https://github.com/elastic/elasticsearch/issues/142762
*/
static boolean isForkOptimizationBugWithUnmappedFields(String errorMessage, String query) {
String errorWithoutLineBreaks = normalizeErrorMessage(errorMessage);
return query.startsWith(SET_UNMAPPED_FIELDS_PREFIX) && FORK_OPTIMIZED_INCORRECTLY_PATTERN.matcher(errorWithoutLineBreaks).matches();
}

private static final Pattern NOT_A_FIELD_FROM_INDEX_PATTERN = Pattern.compile(
".*cannot operate on \\[([^]]+)\\], which is not a field from an index mapping.*",
Pattern.DOTALL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,3 +698,88 @@ FROM languages
host.entity.id:null | entity.id:keyword
null | foo
;


forkWithRowAndUnmappedKeep
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
ROW a = 1
| FORK (where true)
| WHERE a == 1
| KEEP bar
;

bar:null
null
;


forkWithFromAndUnmappedCoalesce
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
FROM employees
| FORK (where foo != 84) (where true)
| WHERE _fork == "fork1"
| DROP _fork
| EVAL y = coalesce(bar, baz)
;

avg_worked_seconds:long | birth_date:date | emp_no:integer | first_name:keyword | gender:keyword | height:double | height.float:double | height.half_float:double | height.scaled_float:double | hire_date:date | is_rehired:boolean | job_positions:keyword | languages:integer | languages.byte:integer | languages.long:long | languages.short:integer | last_name:keyword | salary:integer | salary_change:double | salary_change.int:integer | salary_change.keyword:keyword | salary_change.long:long | still_hired:boolean | foo:null | bar:null | baz:null | y:null
;


forkWithUnmappedStatsEvalKeep
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
FROM alerts
| KEEP kibana.alert.risk_score
| FORK (WHERE true | MV_EXPAND kibana.alert.risk_score)
| STATS kibana.alert.risk_score = COUNT(*)
| EVAL x = LEAST(kibana.alert.risk_score, 52, 60)
| KEEP kibana.alert.risk_score
;

kibana.alert.risk_score:long
10
;


forkWithUnmappedStatsEvalKeepTwoBranches
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify
required_capability: sample_v3

SET unmapped_fields="nullify"\;
FROM alerts
| KEEP kibana.alert.risk_score
| FORK (WHERE true | MV_EXPAND kibana.alert.risk_score) (WHERE true | SAMPLE 0.5)
| STATS kibana.alert.risk_score = COUNT(*)
| EVAL x = LEAST(kibana.alert.risk_score, 52, 60)
| KEEP kibana.alert.risk_score
;

kibana.alert.risk_score:long
10..20
;


forkWithRowCoalesceAndDrop
required_capability: fork_v9
required_capability: fix_fork_unmapped_nullify

SET unmapped_fields="nullify"\;
ROW a = 12::long
| FORK (WHERE true)
| EVAL x = COALESCE(a, 5)
| DROP a
;

_fork:keyword | x:long
fork1 | 12
;
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,15 @@ public enum Cap {
*/
TOP_SNIPPETS_FOLDABLE_QUERY_CHECK,

/**
* Fixes an analysis bug in {@code FORK} with {@code unmapped_fields="nullify"}.
* Preserve existing attribute {@code NameId}s so that references from upper plan nodes remain valid after
* sub-plans are updated. Only genuinely new attributes get fresh NameIds.
* Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
* https://github.com/elastic/elasticsearch/issues/142762
*/
FIX_FORK_UNMAPPED_NULLIFY,

// Last capability should still have a comma for fewer merge conflicts when adding new ones :)
// This comment prevents the semicolon from being on the previous capability when Spotless formats the file.
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@
import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE;
import static org.elasticsearch.xpack.esql.capabilities.TranslationAware.translatable;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributesPreservingIds;
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN;
import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME;
Expand Down Expand Up @@ -1096,7 +1096,7 @@ && subqueryReferencingIndexWithEmptyMapping(fork, logicalPlan, forkColumns) == f
return fork;
}

return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributes(outputUnion));
return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributesPreservingIds(outputUnion, fork.output()));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private static Alias nullAlias(NamedExpression attribute) {
* excluding the {@link UnresolvedPattern} and {@link UnresolvedTimestamp} subtypes.
*/
private static LinkedHashMap<String, List<UnresolvedAttribute>> collectUnresolved(LogicalPlan plan) {
Set<String> childOutputNames = new java.util.HashSet<>();
Set<String> childOutputNames = new HashSet<>();
for (LogicalPlan child : plan.children()) {
for (Attribute attr : child.output()) {
childOutputNames.add(attr.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import static java.util.Collections.emptyList;
Expand All @@ -32,19 +34,27 @@ public static List<Attribute> asAttributes(List<? extends NamedExpression> named
}

/**
* @return a list of {@link ReferenceAttribute}s corresponding to the given named expressions.
* <p>
* The returned ReferenceAttributes will have new {@link NameId}s, also in the case the input contains ReferenceAttributes.
* Converts named expressions to {@link ReferenceAttribute}s, preserving {@link NameId}s for attributes whose name
* matches one in {@code existingOutput}. Genuinely new attributes get fresh NameIds.
*/
public static List<Attribute> toReferenceAttributes(List<? extends NamedExpression> named) {
public static List<Attribute> toReferenceAttributesPreservingIds(
List<? extends NamedExpression> named,
List<Attribute> existingOutput
) {
if (named.isEmpty()) {
return emptyList();
}
Map<String, Attribute> existingByName = HashMap.newHashMap(existingOutput.size());
for (Attribute attr : existingOutput) {
existingByName.put(attr.name(), attr);
}
List<Attribute> list = new ArrayList<>(named.size());
for (NamedExpression exp : named) {
Attribute existing = existingByName.get(exp.name());
NameId id = existing != null ? existing.id() : new NameId();
ReferenceAttribute refAttr = exp instanceof ReferenceAttribute ra
? (ReferenceAttribute) ra.withId(new NameId())
: new ReferenceAttribute(exp.source(), null, exp.name(), exp.dataType(), exp.nullable(), null, exp.synthetic());
? (ReferenceAttribute) ra.withId(id)
: new ReferenceAttribute(exp.source(), null, exp.name(), exp.dataType(), exp.nullable(), id, exp.synthetic());
list.add(refAttr);
}
return list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.analysis.Analyzer.NO_FIELDS;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes;
import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributesPreservingIds;

/**
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
Expand Down Expand Up @@ -105,13 +105,11 @@ public Fork replaceSubPlansAndOutput(List<LogicalPlan> subPlans, List<Attribute>
}

public Fork refreshOutput() {
// We don't want to keep the same attributes that are outputted by the FORK branches.
// Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
Comment on lines -108 to -109
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be removed. The attribute IDs from within the branches are not kept / reused atop Fork.

return new Fork(source(), children(), refreshedOutput());
}

protected List<Attribute> refreshedOutput() {
return toReferenceAttributes(outputUnion(children()));
return toReferenceAttributesPreservingIds(outputUnion(children()), this.output());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this no longer refreshes the IDs. Meaning that some assumptions are either broken, or they were incorrect to have, or they're no longer valid (in the meantime).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bpintea can you expand on this, please? What use cases would this impact/break/change? (are we missing tests?)

}

@Override
Expand Down
Loading
Loading