-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ESQL: Allow pruning columns added by InlineJoin #131204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7867a30
64ecf69
d3d1d24
62ad977
13338c3
4f004e6
e824c30
458b94a
c458cc3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 131204 | ||
| summary: Allow pruning columns added by `InlineJoin` | ||
| area: ES|QL | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,6 +14,7 @@ | |
| import org.elasticsearch.xpack.esql.core.expression.Attribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.AttributeSet; | ||
| import org.elasticsearch.xpack.esql.core.expression.Expressions; | ||
| import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; | ||
| import org.elasticsearch.xpack.esql.core.expression.NamedExpression; | ||
| import org.elasticsearch.xpack.esql.core.util.Holder; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Aggregate; | ||
|
|
@@ -22,7 +23,11 @@ | |
| import org.elasticsearch.xpack.esql.plan.logical.Fork; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Limit; | ||
| import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Project; | ||
| import org.elasticsearch.xpack.esql.plan.logical.Sample; | ||
| import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin; | ||
| import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation; | ||
| import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier; | ||
| import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; | ||
| import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier; | ||
| import org.elasticsearch.xpack.esql.planner.PlannerUtils; | ||
|
|
@@ -38,11 +43,10 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> { | |
|
|
||
| @Override | ||
| public LogicalPlan apply(LogicalPlan plan) { | ||
| // track used references | ||
| var used = plan.outputSet().asBuilder(); | ||
| // track inlinestats' own aggregation output (right-hand side of the join) so that any other plan on the left-hand side of the | ||
| // inline join won't have its columns pruned due to the lack of "visibility" into the right hand side output/Attributes | ||
| var inlineJoinRightOutput = new ArrayList<Attribute>(); | ||
| return pruneColumns(plan, plan.outputSet().asBuilder(), false); | ||
| } | ||
|
|
||
| private static LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used, boolean inlineJoin) { | ||
| Holder<Boolean> forkPresent = new Holder<>(false); | ||
|
|
||
| // while going top-to-bottom (upstream) | ||
|
|
@@ -54,8 +58,9 @@ public LogicalPlan apply(LogicalPlan plan) { | |
| // same index fields will have different name ids in the left and right hand sides - as in the extreme example | ||
| // `FROM lookup_idx | LOOKUP JOIN lookup_idx ON key_field`. | ||
|
|
||
| // skip nodes that simply pass the input through | ||
| if (p instanceof Limit) { | ||
| // TODO: revisit with every new command | ||
| // skip nodes that simply pass the input through and use no references | ||
| if (p instanceof Limit || p instanceof Sample) { | ||
| return p; | ||
| } | ||
|
|
||
|
|
@@ -67,77 +72,20 @@ public LogicalPlan apply(LogicalPlan plan) { | |
| return p; | ||
| } | ||
|
|
||
| // TODO: INLINESTATS unit testing for tracking this set | ||
| if (p instanceof InlineJoin ij) { | ||
| inlineJoinRightOutput.addAll(ij.right().outputSet()); | ||
| } | ||
|
|
||
| // remember used | ||
| boolean recheck; | ||
| var recheck = new Holder<Boolean>(); | ||
| // analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate | ||
| // perform a loop to retry checking if the current node is completely eliminated | ||
| do { | ||
| recheck = false; | ||
| if (p instanceof Aggregate aggregate) { | ||
| // TODO: INLINESTATS https://github.com/elastic/elasticsearch/pull/128917#discussion_r2175162099 | ||
| var remaining = removeUnused(aggregate.aggregates(), used, inlineJoinRightOutput); | ||
|
|
||
| if (remaining != null) { | ||
| if (remaining.isEmpty()) { | ||
| // We still need to have a plan that produces 1 row per group. | ||
| if (aggregate.groupings().isEmpty()) { | ||
| p = new LocalRelation( | ||
| aggregate.source(), | ||
| List.of(Expressions.attribute(aggregate.aggregates().getFirst())), | ||
| LocalSupplier.of( | ||
| new Block[] { BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1) } | ||
| ) | ||
| ); | ||
| } else { | ||
| // Aggs cannot produce pages with 0 columns, so retain one grouping. | ||
| Attribute attribute = Expressions.attribute(aggregate.groupings().getFirst()); | ||
| NamedExpression firstAggregate = aggregate.aggregates().getFirst(); | ||
| remaining = List.of( | ||
| new Alias(firstAggregate.source(), firstAggregate.name(), attribute, firstAggregate.id()) | ||
| ); | ||
| p = aggregate.with(aggregate.groupings(), remaining); | ||
| } | ||
| } else { | ||
| p = aggregate.with(aggregate.groupings(), remaining); | ||
| } | ||
| } | ||
| } else if (p instanceof InlineJoin ij) {// TODO: InlineStats - add unit tests for this IJ removal | ||
| var remaining = removeUnused(ij.right().output(), used, inlineJoinRightOutput); | ||
| if (remaining != null) { | ||
| if (remaining.isEmpty()) { | ||
| // remove the InlineJoin altogether | ||
| p = ij.left(); | ||
| recheck = true; | ||
| } | ||
| // TODO: InlineStats - prune ONLY the unused output columns from it? In other words, don't perform more aggs | ||
| // if they will not be used anyway | ||
| } | ||
| } else if (p instanceof Eval eval) { | ||
| var remaining = removeUnused(eval.fields(), used, inlineJoinRightOutput); | ||
| // no fields, no eval | ||
| if (remaining != null) { | ||
| if (remaining.isEmpty()) { | ||
| p = eval.child(); | ||
| recheck = true; | ||
| } else { | ||
| p = new Eval(eval.source(), eval.child(), remaining); | ||
| } | ||
| } | ||
| } else if (p instanceof EsRelation esr && esr.indexMode() == IndexMode.LOOKUP) { | ||
| // Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway. | ||
| // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index, | ||
| // it works differently as we extract all fields (other than the join key) that the EsRelation has. | ||
| var remaining = removeUnused(esr.output(), used, inlineJoinRightOutput); | ||
| if (remaining != null) { | ||
| p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining); | ||
| } | ||
| } | ||
| } while (recheck); | ||
| recheck.set(false); | ||
| p = switch (p) { | ||
| case Aggregate agg -> pruneColumns(agg, used, inlineJoin); | ||
| case InlineJoin inj -> pruneColumns(inj, used, recheck); | ||
| case Eval eval -> pruneColumns(eval, used, recheck); | ||
| case Project project -> inlineJoin ? pruneColumns(project, used) : p; | ||
| case EsRelation esr -> pruneColumns(esr, used); | ||
| default -> p; | ||
| }; | ||
| } while (recheck.get()); | ||
|
|
||
| used.addAll(p.references()); | ||
|
|
||
|
|
@@ -148,24 +96,143 @@ public LogicalPlan apply(LogicalPlan plan) { | |
| return pl; | ||
| } | ||
|
|
||
| private static LogicalPlan pruneColumns(Aggregate aggregate, AttributeSet.Builder used, boolean inlineJoin) { | ||
| LogicalPlan p = aggregate; | ||
|
|
||
| var remaining = pruneUnusedAndAddReferences(aggregate.aggregates(), used); | ||
|
|
||
| if (remaining != null) { | ||
|
||
| if (remaining.isEmpty()) { | ||
| if (inlineJoin) { | ||
| p = emptyLocalRelation(aggregate); | ||
| } else if (aggregate.groupings().isEmpty()) { | ||
| // We still need to have a plan that produces 1 row per group. | ||
| p = new LocalRelation( | ||
| aggregate.source(), | ||
| List.of(Expressions.attribute(aggregate.aggregates().getFirst())), | ||
| LocalSupplier.of(new Block[] { BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1) }) | ||
| ); | ||
| } else { | ||
| // Aggs cannot produce pages with 0 columns, so retain one grouping. | ||
| Attribute attribute = Expressions.attribute(aggregate.groupings().getFirst()); | ||
| NamedExpression firstAggregate = aggregate.aggregates().getFirst(); | ||
| remaining = List.of(new Alias(firstAggregate.source(), firstAggregate.name(), attribute, firstAggregate.id())); | ||
| p = aggregate.with(aggregate.groupings(), remaining); | ||
| } | ||
| } else { | ||
| if (inlineJoin && aggregate.groupings().containsAll(remaining)) { // not expecting high groups cardinality | ||
|
||
| // It's an INLINEJOIN and all remaining attributes are groupings, which are already part of the IJ output (from the | ||
| // left-hand side). | ||
| if (aggregate.child() instanceof StubRelation stub) { | ||
|
||
| var message = "Aggregate groups references [" | ||
| + remaining | ||
| + "] not in child's (StubRelation) output: [" | ||
| + stub.outputSet() | ||
| + "]"; | ||
| assert stub.outputSet().containsAll(Expressions.asAttributes(remaining)) : message; | ||
|
|
||
| p = emptyLocalRelation(aggregate); | ||
| } else { | ||
| // There are no aggregates to compute, just output the groupings; these are already in the IJ output, so only | ||
| // restrict the output to what remained. | ||
| p = new Project(aggregate.source(), aggregate.child(), remaining); | ||
| } | ||
| } else { // not an INLINEJOIN or there are actually aggregates to compute | ||
| p = aggregate.with(aggregate.groupings(), remaining); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return p; | ||
| } | ||
|
|
||
| private static LogicalPlan pruneColumns(InlineJoin ij, AttributeSet.Builder used, Holder<Boolean> recheck) { | ||
| LogicalPlan p = ij; | ||
|
|
||
| used.addAll(ij.references()); | ||
| var right = pruneColumns(ij.right(), used, true); | ||
| if (right.output().isEmpty()) { | ||
| p = ij.left(); | ||
| recheck.set(true); | ||
| } else if (right != ij.right()) { | ||
| // if the right side has been updated, replace it | ||
| p = ij.replaceRight(right); | ||
| } | ||
|
|
||
| return p; | ||
| } | ||
|
|
||
| private static LogicalPlan pruneColumns(Eval eval, AttributeSet.Builder used, Holder<Boolean> recheck) { | ||
| LogicalPlan p = eval; | ||
|
|
||
| var remaining = pruneUnusedAndAddReferences(eval.fields(), used); | ||
| // no fields, no eval | ||
| if (remaining != null) { | ||
| if (remaining.isEmpty()) { | ||
| p = eval.child(); | ||
| recheck.set(true); | ||
| } else { | ||
| p = new Eval(eval.source(), eval.child(), remaining); | ||
| } | ||
| } | ||
|
|
||
| return p; | ||
| } | ||
|
|
||
| private static LogicalPlan pruneColumns(Project project, AttributeSet.Builder used) { | ||
|
||
| LogicalPlan p = project; | ||
|
|
||
| var remaining = pruneUnusedAndAddReferences(project.projections(), used); | ||
| if (remaining != null) { | ||
| p = remaining.isEmpty() || remaining.stream().allMatch(FieldAttribute.class::isInstance) | ||
| ? emptyLocalRelation(project) | ||
| : new Project(project.source(), project.child(), remaining); | ||
| } else if (project.output().stream().allMatch(FieldAttribute.class::isInstance)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we interested in this aspect? (the fact that the projection is made of fields only)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The explanation is the one given in the previous comment -- I've added a code comment too. |
||
| p = emptyLocalRelation(project); | ||
| } | ||
|
|
||
| return p; | ||
| } | ||
|
|
||
| private static LogicalPlan pruneColumns(EsRelation esr, AttributeSet.Builder used) { | ||
| LogicalPlan p = esr; | ||
|
|
||
| if (esr.indexMode() == IndexMode.LOOKUP) { | ||
| // Normally, pruning EsRelation has no effect because InsertFieldExtraction only extracts the required fields, anyway. | ||
| // However, InsertFieldExtraction can't be currently used in LOOKUP JOIN right index, | ||
| // it works differently as we extract all fields (other than the join key) that the EsRelation has. | ||
| var remaining = pruneUnusedAndAddReferences(esr.output(), used); | ||
| if (remaining != null) { | ||
| p = new EsRelation(esr.source(), esr.indexPattern(), esr.indexMode(), esr.indexNameWithModes(), remaining); | ||
| } | ||
| } | ||
|
|
||
| return p; | ||
| } | ||
|
|
||
| private static LogicalPlan emptyLocalRelation(LogicalPlan plan) { | ||
| // create an empty local relation with no attributes | ||
| return new LocalRelation(plan.source(), List.of(), EmptyLocalSupplier.EMPTY); | ||
| } | ||
|
|
||
| /** | ||
| * Prunes attributes from the list not found in the given set. | ||
| * Returns null if no changed occurred. | ||
| * Prunes attributes from the `named` list that are not found in the given set (builder). | ||
| * Returns null if no pruning occurred. | ||
| * As a side effect, the references of the kept attributes are added to the input set (builder) -- irrespective of the return value. | ||
| */ | ||
| private static <N extends NamedExpression> List<N> removeUnused(List<N> named, AttributeSet.Builder used, List<Attribute> exceptions) { | ||
| private static <N extends NamedExpression> List<N> pruneUnusedAndAddReferences(List<N> named, AttributeSet.Builder used) { | ||
| var clone = new ArrayList<>(named); | ||
| var it = clone.listIterator(clone.size()); | ||
|
|
||
| // due to Eval, go in reverse | ||
| while (it.hasPrevious()) { | ||
| for (var it = clone.listIterator(clone.size()); it.hasPrevious();) { | ||
| N prev = it.previous(); | ||
| var attr = prev.toAttribute(); | ||
| if (used.contains(attr) == false && exceptions.contains(attr) == false) { | ||
| it.remove(); | ||
| } else { | ||
| if (used.contains(attr)) { | ||
| used.addAll(prev.references()); | ||
| } else { | ||
| it.remove(); | ||
| } | ||
| } | ||
|
|
||
| return clone.size() != named.size() ? clone : null; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a harder to follow code, but with some comments it can be made easier to digest.
For one, I would add two methods; one
LogicalPlan pruneColumns(LogicalPlan plan, AttributeSet.Builder used)and use it here. This first method would callpruneColumns(plan, plan.outputSet().asBuilder(), false).The second method would be
pruneInlineJoinColumns(LogicalPlan plan, AttributeSet.Builder used)and callpruneColumns(plan, plan.outputSet().asBuilder(), true).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked the uniformity of the previous namings, but I realise that it doesn't make the flow obvious at call sites, so I've renamed the methods to make it clearer what happens with
InlineJoininstances. But happy to iterate if still unclear.