-
Notifications
You must be signed in to change notification settings - Fork 25.9k
ESQL: Fix injected attributes's IDs in UnionAll branches #141262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c435d5d
fd346a1
6453bfc
c213602
0aae586
6b19262
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 141262 | ||
| summary: Fix injected attributes's IDs in `UnionAll` branches | ||
| area: ES|QL | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,6 +181,7 @@ | |
| import static java.util.Collections.singletonList; | ||
| import static org.elasticsearch.xpack.core.enrich.EnrichPolicy.GEO_MATCH_TYPE; | ||
| import static org.elasticsearch.xpack.esql.capabilities.TranslationAware.translatable; | ||
| import static org.elasticsearch.xpack.esql.core.expression.Expressions.toReferenceAttributes; | ||
| import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; | ||
| import static org.elasticsearch.xpack.esql.core.type.DataType.BOOLEAN; | ||
| import static org.elasticsearch.xpack.esql.core.type.DataType.DATETIME; | ||
|
|
@@ -1025,17 +1026,7 @@ private LogicalPlan resolveFork(Fork fork) { | |
| return fork; | ||
| } | ||
|
|
||
| List<Attribute> newOutput = new ArrayList<>(); | ||
|
|
||
| // We don't want to keep the same attributes that are outputted by the FORK branches. | ||
| // Keeping the same attributes can have unintended side effects when applying optimizations like constant folding. | ||
| for (Attribute attr : outputUnion) { | ||
| newOutput.add( | ||
| new ReferenceAttribute(attr.source(), null, attr.name(), attr.dataType(), Nullability.FALSE, null, attr.synthetic()) | ||
| ); | ||
| } | ||
|
|
||
| return fork.replaceSubPlansAndOutput(newSubPlans, newOutput); | ||
| return fork.replaceSubPlansAndOutput(newSubPlans, toReferenceAttributes(outputUnion)); | ||
| } | ||
|
|
||
| private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput, AnalyzerContext context) { | ||
|
|
@@ -2741,16 +2732,18 @@ private static LogicalPlan maybePushDownConvertFunctions( | |
| List<Attribute> newChildOutput = new ArrayList<>(childOutput.size()); | ||
| for (Attribute oldAttr : childOutput) { | ||
| newChildOutput.add(oldAttr); | ||
| if (oldOutputToConvertFunctions.containsKey(oldAttr.name())) { | ||
| Set<AbstractConvertFunction> converts = oldOutputToConvertFunctions.get(oldAttr.name()); | ||
| Set<AbstractConvertFunction> converts = oldOutputToConvertFunctions.get(oldAttr.name()); | ||
| if (converts != null) { | ||
| // create a new alias for each conversion function and add it to the new aliases list | ||
| for (AbstractConvertFunction convert : converts) { | ||
| // create a new alias for the conversion function | ||
| String newAliasName = Attribute.rawTemporaryName(oldAttr.name(), "converted_to", convert.dataType().typeName()); | ||
| Alias newAlias = new Alias( | ||
| oldAttr.source(), | ||
| newAliasName, // oldAttrName$$converted_to$$targetType | ||
| convert.replaceChildren(Collections.singletonList(oldAttr)) | ||
| convert.replaceChildren(Collections.singletonList(oldAttr)), | ||
| null, // generate a new id | ||
| true // this'll be used to Project the synthetic attributes out when finishing analysis | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| ); | ||
| newAliases.add(newAlias); | ||
| newChildOutput.add(newAlias.toAttribute()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
| import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.Literal; | ||
| import org.elasticsearch.xpack.esql.core.expression.NameId; | ||
| import org.elasticsearch.xpack.esql.core.expression.NamedExpression; | ||
| import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.UnresolvedPattern; | ||
| import org.elasticsearch.xpack.esql.core.expression.UnresolvedTimestamp; | ||
|
|
@@ -28,7 +29,6 @@ | |
| import org.elasticsearch.xpack.esql.plan.logical.Eval; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Fork; | ||
| import org.elasticsearch.xpack.esql.plan.logical.LeafPlan; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Limit; | ||
| import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Project; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Row; | ||
|
|
@@ -44,6 +44,7 @@ | |
|
|
||
| import static org.elasticsearch.xpack.esql.analysis.Analyzer.ResolveRefs.insistKeyword; | ||
| import static org.elasticsearch.xpack.esql.core.util.CollectionUtils.combine; | ||
| import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes; | ||
|
|
||
| /** | ||
| * The rule handles fields that don't show up in the index mapping, but are used within the query. These fields can either be missing | ||
|
|
@@ -81,8 +82,9 @@ private static LogicalPlan resolve(LogicalPlan plan, boolean load) { | |
| if (unresolved.isEmpty()) { | ||
| return plan; | ||
| } | ||
| var unresolvedLinkedSet = unresolvedLinkedSet(unresolved); | ||
|
|
||
| var transformed = load ? load(plan, unresolved) : nullify(plan, unresolved); | ||
| var transformed = load ? load(plan, unresolvedLinkedSet) : nullify(plan, unresolvedLinkedSet); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the unresolveds being in a linked (thus order-preserving) set is important, should the signature of |
||
|
|
||
| return transformed.equals(plan) ? plan : refreshPlan(transformed, unresolved); | ||
| } | ||
|
|
@@ -91,21 +93,17 @@ private static LogicalPlan resolve(LogicalPlan plan, boolean load) { | |
| * The method introduces {@code EVAL missing_field = NULL}-equivalent into the plan, on top of the source, for every attribute in | ||
| * {@code unresolved}. It also "patches" the introduced attributes through the plan, where needed (like through Fork/UntionAll). | ||
| */ | ||
| private static LogicalPlan nullify(LogicalPlan plan, List<UnresolvedAttribute> unresolved) { | ||
| var nullAliases = nullAliases(unresolved); | ||
|
|
||
| private static LogicalPlan nullify(LogicalPlan plan, Set<UnresolvedAttribute> unresolved) { | ||
| // insert an Eval on top of every LeafPlan, if there's a UnaryPlan atop it | ||
| var transformed = plan.transformUp( | ||
| n -> n instanceof UnaryPlan unary && unary.child() instanceof LeafPlan, | ||
| p -> evalUnresolvedUnary((UnaryPlan) p, nullAliases) | ||
| p -> evalUnresolvedAtopUnary((UnaryPlan) p, nullAliases(unresolved)) | ||
| ); | ||
| // insert an Eval on top of those LeafPlan that are children of n-ary plans (could happen with UnionAll) | ||
| transformed = transformed.transformUp( | ||
| return transformed.transformUp( | ||
| n -> n instanceof UnaryPlan == false && n instanceof LeafPlan == false, | ||
| nAry -> evalUnresolvedNary(nAry, nullAliases) | ||
| nAry -> evalUnresolvedAtopNary(nAry, nullAliases(unresolved)) | ||
| ); | ||
|
|
||
| return transformed.transformUp(Fork.class, f -> patchFork(f, Expressions.asAttributes(nullAliases))); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -115,95 +113,73 @@ private static LogicalPlan nullify(LogicalPlan plan, List<UnresolvedAttribute> u | |
| * <p> | ||
| * It also "patches" the introduced attributes through the plan, where needed (like through Fork/UntionAll). | ||
| */ | ||
| private static LogicalPlan load(LogicalPlan plan, List<UnresolvedAttribute> unresolved) { | ||
| private static LogicalPlan load(LogicalPlan plan, Set<UnresolvedAttribute> unresolved) { | ||
| // TODO: this will need to be revisited for non-lookup joining or scenarios where we won't want extraction from specific sources | ||
| var transformed = plan.transformUp(EsRelation.class, esr -> { | ||
| return plan.transformUp(EsRelation.class, esr -> { | ||
| if (esr.indexMode() == IndexMode.LOOKUP) { | ||
| return esr; | ||
| } | ||
| List<FieldAttribute> fieldsToLoad = fieldsToLoad(unresolved, esr.outputSet().names()); | ||
| List<FieldAttribute> fieldsToLoad = fieldsToLoad(unresolved, Expressions.names(esr.output())); | ||
| // there shouldn't be any duplicates, we can just merge the two lists | ||
| return fieldsToLoad.isEmpty() ? esr : esr.withAttributes(combine(esr.output(), fieldsToLoad)); | ||
| }); | ||
|
|
||
| return transformed.transformUp(Fork.class, f -> patchFork(f, Expressions.asAttributes(fieldsToLoad(unresolved, Set.of())))); | ||
| } | ||
|
|
||
| private static List<FieldAttribute> fieldsToLoad(List<UnresolvedAttribute> unresolved, Set<String> exclude) { | ||
| private static List<FieldAttribute> fieldsToLoad(Set<UnresolvedAttribute> unresolved, List<String> exclude) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this a mistake? I would have expected the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a Set because the initial collection of UnresolvedAttributes is dedup'd -- this is what What we want here is to exclude those attributes produces by the EsRelation itself into which we would then later inject/ Not sure if it's worth instantiating new collection types to wrap the existing ones. |
||
| List<FieldAttribute> insisted = new ArrayList<>(unresolved.size()); | ||
| Set<String> names = new LinkedHashSet<>(unresolved.size()); | ||
| for (var ua : unresolved) { | ||
| // some plans may reference the same UA multiple times (Aggregate groupings in aggregates, Eval) | ||
| if (names.contains(ua.name()) == false && exclude.contains(ua.name()) == false) { | ||
| if (exclude.contains(ua.name()) == false) { | ||
| insisted.add(insistKeyword(ua)); | ||
| names.add(ua.name()); | ||
| } | ||
| } | ||
| return insisted; | ||
| } | ||
|
|
||
| // TODO: would an alternative to this be to drop the current Fork and have ResolveRefs#resolveFork re-resolve it. We might need | ||
| // some plan delimiters/markers to make it unequivocal which nodes belong to "make Fork work" - like (Limit-Project[-Eval])s - and | ||
| // which don't. | ||
| private static Fork patchFork(Fork fork, List<Attribute> aliasAttributes) { | ||
| // if no child outputs the attribute, don't patch it through at all. | ||
| aliasAttributes.removeIf(a -> fork.children().stream().anyMatch(f -> descendantOutputsAttribute(f, a)) == false); | ||
| if (aliasAttributes.isEmpty()) { | ||
| return fork; | ||
| } | ||
|
|
||
| // TODO: would an alternative to this be to have ResolveRefs#resolveFork re-resolve the Fork? | ||
| // We might need some plan delimiters/markers to make it unequivocal which nodes belong to | ||
| // "make Fork work" - like ([Limit -] Project [- Eval])s - and which don't. | ||
| // PruneColumns does the same dance. There's some fragility w.r.t. assuming there to be a top Project and danger of the outputs not | ||
| // being aligned after applying the changes. | ||
| /** | ||
| * Update the Fork's top Projects in the subplans, and correspondingly, its output, to account for newly introduced aliases. | ||
| */ | ||
| private static Fork patchFork(Fork fork) { | ||
| List<LogicalPlan> newChildren = new ArrayList<>(fork.children().size()); | ||
| boolean childrenChanged = false; | ||
| for (var child : fork.children()) { | ||
| Holder<Boolean> patched = new Holder<>(false); | ||
| child = child.transformDown( | ||
| var transformed = child.transformDown( | ||
| // TODO add a suitable forEachDownMayReturnEarly equivalent | ||
| n -> patched.get() == false && n instanceof Project, // process top Project only (Fork-injected) | ||
| n -> { | ||
| patched.set(true); | ||
| return patchForkProject((Project) n, aliasAttributes); | ||
| return patchForkProject((Project) n); | ||
| } | ||
| ); | ||
| if (patched.get() == false) { // assert | ||
| throw new EsqlIllegalArgumentException("Fork child misses a top projection"); | ||
| } | ||
| newChildren.add(child); | ||
| } | ||
|
|
||
| return fork.replaceSubPlansAndOutput(newChildren, combine(fork.output(), aliasAttributes)); | ||
| } | ||
|
|
||
| private static Project patchForkProject(Project project, List<Attribute> aliasAttributes) { | ||
| // refresh the IDs for each UnionAll child (needed for correct resolution of convert functions; see collectConvertFunctions()) | ||
| aliasAttributes = aliasAttributes.stream().map(a -> a.withId(new NameId())).toList(); | ||
|
|
||
| project = project.withProjections(combine(project.projections(), aliasAttributes)); | ||
|
|
||
| // If Project's child doesn't output the attribute, introduce a null-Eval'ing. This is similar to what Fork-resolution does. | ||
| List<Alias> nullAliases = new ArrayList<>(aliasAttributes.size()); | ||
| for (var attribute : aliasAttributes) { | ||
| if (descendantOutputsAttribute(project, attribute) == false) { | ||
| nullAliases.add(nullAlias(attribute)); | ||
| } | ||
| childrenChanged |= transformed != child; | ||
| newChildren.add(transformed); | ||
| } | ||
| return nullAliases.isEmpty() ? project : project.replaceChild(new Eval(project.source(), project.child(), nullAliases)); | ||
| return childrenChanged ? fork.withSubPlans(newChildren) : fork; | ||
| } | ||
|
|
||
| /** | ||
| * Fork injects a {@code Limit - Project (- Eval)} top structure into its subtrees. Skip the top Limit (if present) and Project in | ||
| * the {@code plan} and look at the output of the remaining fragment. | ||
| * @return {@code true} if this fragment's output contains the {@code attribute}. | ||
| * Add any missing attributes that are found in the child's output but not in the Project's output. These have been injected before | ||
| * by the evalUnresolvedAtopXXX methods and need to be "let through" the Project. | ||
| */ | ||
| private static boolean descendantOutputsAttribute(LogicalPlan plan, Attribute attribute) { | ||
| plan = plan instanceof Limit limit ? limit.child() : plan; | ||
| if (plan instanceof Project project) { | ||
| return project.child().outputSet().names().contains(attribute.name()); | ||
| private static Project patchForkProject(Project project) { | ||
| var projectOutput = project.output(); | ||
| var childOutput = project.child().output(); | ||
| if (projectOutput.equals(childOutput) == false) { | ||
|
Comment on lines
+170
to
+172
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Since these changes aren't functionally impacting, I'd apply them to a follow-up PR (unless other changes will be required), if ok with you?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's ok. Thanks. |
||
| List<Attribute> delta = new ArrayList<>(childOutput); | ||
| delta.removeAll(projectOutput); | ||
| project = project.withProjections(mergeOutputAttributes(delta, projectOutput)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we please avoid this pattern of renaming the input parameter and then returning it outside the block? Just use an early exit above.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a pre-existing pattern. Some folks find it easier to read code with fewer returns. (Myself, I don't necessarily, but I don't mind this style either).
...reason being: if the control hasn't visited the block, the input is simply returned with no change.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally find it unfathomable that it's harder to read code with more early exits than it is to read code with more changes if the input variable, but to each their own I guess 🥲. |
||
| } | ||
| throw new EsqlIllegalArgumentException("unexpected node type [{}]", plan); // assert | ||
| return project; | ||
| } | ||
|
|
||
| private static LogicalPlan refreshPlan(LogicalPlan plan, List<UnresolvedAttribute> unresolved) { | ||
| var refreshed = refreshUnresolved(plan, unresolved); | ||
| return refreshChildren(refreshed); | ||
| return refreshed.transformDown(Fork.class, ResolveUnmapped::patchFork); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -222,25 +198,10 @@ private static LogicalPlan refreshUnresolved(LogicalPlan plan, List<UnresolvedAt | |
| }); | ||
| } | ||
|
|
||
| /** | ||
| * @return A plan having all nodes recreated (no properties changed, otherwise). This is needed to clear internal, lazy-eval'd and | ||
| * cached state, such as the output. The rule inserts new attributes in the plan, so the output of all the nodes downstream these | ||
| * insertions need be recomputed. | ||
| */ | ||
| private static LogicalPlan refreshChildren(LogicalPlan plan) { | ||
| var planChildren = plan.children(); | ||
| if (planChildren.isEmpty()) { | ||
| return plan; | ||
| } | ||
| List<LogicalPlan> newChildren = new ArrayList<>(planChildren.size()); | ||
| planChildren.forEach(child -> newChildren.add(refreshChildren(child))); | ||
| return plan.replaceChildren(newChildren); | ||
| } | ||
|
|
||
| /** | ||
| * Inserts an Eval atop each child of the given {@code nAry}, if the child is a LeafPlan. | ||
| */ | ||
| private static LogicalPlan evalUnresolvedNary(LogicalPlan nAry, List<Alias> nullAliases) { | ||
| private static LogicalPlan evalUnresolvedAtopNary(LogicalPlan nAry, List<Alias> nullAliases) { | ||
| List<LogicalPlan> newChildren = new ArrayList<>(nAry.children().size()); | ||
| boolean changed = false; | ||
| for (var child : nAry.children()) { | ||
|
|
@@ -257,7 +218,7 @@ private static LogicalPlan evalUnresolvedNary(LogicalPlan nAry, List<Alias> null | |
| /** | ||
| * Inserts an Eval atop the given {@code unaryAtopSource}, if this isn't an Eval already. Otherwise it merges the nullAliases into it. | ||
| */ | ||
| private static LogicalPlan evalUnresolvedUnary(UnaryPlan unaryAtopSource, List<Alias> nullAliases) { | ||
| private static LogicalPlan evalUnresolvedAtopUnary(UnaryPlan unaryAtopSource, List<Alias> nullAliases) { | ||
| assertSourceType(unaryAtopSource.child()); | ||
| if (unaryAtopSource instanceof Eval eval && eval.resolved()) { // if this Eval isn't resolved, insert a new (resolved) one | ||
| List<Alias> pre = new ArrayList<>(nullAliases.size()); | ||
|
|
@@ -291,16 +252,23 @@ private static void assertSourceType(LogicalPlan source) { | |
| } | ||
| } | ||
|
|
||
| private static List<Alias> nullAliases(List<UnresolvedAttribute> unresolved) { | ||
| Map<String, Alias> aliasesMap = new LinkedHashMap<>(unresolved.size()); | ||
| unresolved.forEach(u -> aliasesMap.computeIfAbsent(u.name(), k -> nullAlias(u))); | ||
| return new ArrayList<>(aliasesMap.values()); | ||
| private static List<Alias> nullAliases(Set<UnresolvedAttribute> unresolved) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here: If the output list is supposed to be stable, I think we should explicitly require a |
||
| List<Alias> aliases = new ArrayList<>(unresolved.size()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand how would |
||
| unresolved.forEach(u -> aliases.add(nullAlias(u))); | ||
| return aliases; | ||
| } | ||
|
|
||
| private static Alias nullAlias(Attribute attribute) { | ||
| private static Alias nullAlias(NamedExpression attribute) { | ||
| return new Alias(attribute.source(), attribute.name(), NULLIFIED); | ||
| } | ||
|
|
||
| // Some plans may reference the same UA multiple times (Aggregate groupings in aggregates, Eval): dedupe | ||
| private static LinkedHashSet<UnresolvedAttribute> unresolvedLinkedSet(List<UnresolvedAttribute> unresolved) { | ||
| Map<String, UnresolvedAttribute> aliasesMap = new LinkedHashMap<>(unresolved.size()); | ||
| unresolved.forEach(u -> aliasesMap.putIfAbsent(u.name(), u)); | ||
| return new LinkedHashSet<>(aliasesMap.values()); | ||
| } | ||
|
|
||
| /** | ||
| * @return all the {@link UnresolvedAttribute}s in the given node / {@code plan}, but excluding the {@link UnresolvedPattern} and | ||
| * {@link UnresolvedTimestamp} subtypes. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not the
containsand.getapproach? I see the code that reaches this part is fairly safe to assume that there won't be anynullsets returned for a key, but we are not sure how this code will evolve in the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't strictly related, but I thought I might need an update of this code and spotted the pattern.
There's nothing wrong from the functional PoV, but the code, as it was, checks if the key is in the map, then does it again, but fetching the corresponding value. The code as is in the proposed change only does the latter. If the result/value is
null, the key isn't in there. (Well, I guess it could [have] be[en] anullvalue, to be exact, but that would have resulted in a NPE by now(?))