-
Notifications
You must be signed in to change notification settings - Fork 25.7k
ESQL: Push filters past inline stats #137572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…push_filters_past_inlinestats
…push_filters_past_inlinestats
13d73b6 to
cbd91b4
Compare
…push_filters_past_inlinestats
|
Hi @astefan, I've created a changelog YAML for you. |
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
…push_filters_past_inlinestats
…tefan/elasticsearch into push_filters_past_inlinestats
| c:long | ||
| 1 | ||
| ; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are mostly mirroring the unit tests in PushDownAndCombineFiltersTests
| })); | ||
| } | ||
|
|
||
| public static LogicalPlan newMainPlan(LogicalPlan optimizedPlan, InlineJoin.LogicalPlanTuple subPlans, LocalRelation resultWrapper) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted this as a method and made public to be used in the unit tests. I needed a way to use the mechanism EsqlSession uses to simulate part of the flow inline stats goes through.
| ); | ||
| assertEquals(expectedPushedFilters, actualPushedFilters); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are convoluted (apologies for that) but they are a must. The nature of this optimization (where the results of queries do not change and the PR impact is not easily visible) requires a deep analysis of what happens so that the filters go to the right positions in the logical plan.
If you have further ideas for what to test, please let me know. I could have came up with more complex queries, but the amount of cognitive load on each test writing is high enough that I wanted to speed up the process a bit :-).
My advice: read the method javadoc with the modified logical plan and then try to match what you see there with the actual Java code, otherwise you'd spend a lot of minutes (tens of) for the test code alone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add these into a new file?
PushDownAndCombineFiltersInlineJoinTests.java or something.
bpintea
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First pass. Looks good, only left cosmetics remarks so far.
| return new ScopedFilter(rest, leftFilters, rightFilters); | ||
| } | ||
|
|
||
| // split the filter condition in 2 parts: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // split the filter condition in 2 parts: | |
| // split the filter condition in 3 parts: |
:)
| List<Expression> leftFilters = new ArrayList<>(filters); | ||
|
|
||
| AttributeSet leftOutput = ij.left().outputSet(); | ||
| AttributeSet rightOutput = AttributeSet.of(ij.config().rightFields()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| AttributeSet rightOutput = AttributeSet.of(ij.config().rightFields()); | |
| AttributeSet rightJoinSet = AttributeSet.of(ij.config().rightFields()); |
As it's not really the output of the right (i.e. aggs + groups), but just the groups.
| List<Expression> bothSides = new ArrayList<>(); | ||
| List<Expression> leftFilters = new ArrayList<>(filters); | ||
|
|
||
| AttributeSet leftOutput = ij.left().outputSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| AttributeSet leftOutput = ij.left().outputSet(); | |
| AttributeSet leftOutputSet = ij.left().outputSet(); |
Just to match the below.
| // 1. filters that can be applied only to the right | ||
| // 2. filters that can be applied to both sides | ||
| // 3. filters that can be applied only to the left |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd replace the "can be applied" with "reference" and then at the end "attributes": "1. filters that reference only to the RHS attributes" (or "..the right attributes"), since we're not applying those as such.
| ); | ||
| assertEquals(expectedPushedFilters, actualPushedFilters); | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add these into a new file?
PushDownAndCombineFiltersInlineJoinTests.java or something.
| join = (Join) join.replaceLeft(left); | ||
| // we completely applied the left filters, so we can remove them from the scoped filters | ||
| scoped = new ScopedFilter(scoped.commonFilters(), List.of(), scoped.rightFilters); | ||
| scoped = new ScopedFilter(commonFilters, List.of(), scoped.rightFilters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| scoped = new ScopedFilter(commonFilters, List.of(), scoped.rightFilters); | |
| scoped = new ScopedFilter(commonFilters, List.of(), pushableToRightSide); |
Here and below: it's hard to track which filters come from where. The suggestion above is one way to go.
Another would be to rewrite scoped above instead of extracting the three variables (or return them "ready to be applied" from scopeInlineStatsFilter() already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried multiple approaches here, and none of them is ideal. If I do it like you say (and I believe I did try this approach) another decision based on the join type must be taken somewhere further below. Because the logic is common in many places for inlinejoin and other types of join, I really wanted to not duplicate the logic. Initially I also used a single method for scoping, but it was too messy. Imho, the code I came up with is the least intrusive one and somewhat clean.
Likely, I didn't find the best approach; please, if you have the time, try to make it more easily understandable by refactoring the way you think best. From my point of view (and ease of grasping the logic), what is in this PR is the best version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have refactored this after I looked into pushing down filters on the right-hand side as well. Not possible yet #138257
| var pushableToLeftSide = join instanceof InlineJoin ? scoped.commonFilters() : scoped.leftFilters(); | ||
| var pushableToRightSide = scoped.rightFilters(); | ||
| var commonFilters = join instanceof InlineJoin ? scoped.leftFilters() : scoped.commonFilters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Related to the previous comment above]
| var pushableToLeftSide = join instanceof InlineJoin ? scoped.commonFilters() : scoped.leftFilters(); | |
| var pushableToRightSide = scoped.rightFilters(); | |
| var commonFilters = join instanceof InlineJoin ? scoped.leftFilters() : scoped.commonFilters(); | |
| scoped = join instanceof InlineJoin ? new ScopedFilter(scoped.leftFilters, scoped.commonFilters, scoped.rightFilters) : scoped; |
..and then keep the rest of the code below unchanged.
Alternatively, do this shuffling in scopeInlineStatsFilter() already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the naming here is a bit tricky since, as done now, the filters pushable on the LHS are considered to be the ones taken from the RHS join keys. However, these are the same as the LHS join keys (for InlineJoin).
Basically, we want to push on the LHS of the (inline) join the filters defined on the groupings (which are the join keys). Given that, I think I'd update scopeInlineStatsFilter() code to start with the bothSides var be initialised with the filters param (and not leftFilters as is now) and then extract to the leftFilters those predicates/filters that are part of the join keys.
This would then allow initialising resulting ScopedFilter instance inline with the usage at this code location where this comment is left.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored
| assertEquals(expectedPushedFilters, actualPushedFilters); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice set of tests.
Could we also add some with filters on the aggs and maybe on the groups and see how that mixes?
FROM employees
| INLINE STATS avgByL = AVG(salary) BY languages
| INLINE STATS avgByG = AVG(salary) BY gender
| WHERE languages > 2 AND gender IS NOT NULL AND avgByL > .... AND avgByG < ...
| KEEP avg*, languages, gender, emp_no
FROM employees
| INLINE STATS avg = AVG(salary) WHERE ... BY languages
| WHERE languages > 2
| KEEP avg, languages, salary, emp_no
with possibly some combination of the filter(s) in the INLINE STATS WHERE clause (on the group or other field) and the filter(s) in the WHERE command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added one unit test and two csv tests as the AggregateFunction filters handling is not done on the logical plan side of things from what I could tell.
| // - filters scoped to the right | ||
| // - filter that requires both sides to be evaluated | ||
| ScopedFilter scoped = join instanceof InlineJoin ij | ||
| ? scopeInlineStatsFilter(Predicates.splitAnd(filter.condition()), ij) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional: Predicates.splitAnd(filter.condition() could be taken out in a var, to make the ternary expression "lighter".
bpintea
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
I've only left one more comment on making the namings easier to understand.
But I believe the logic is correct. So LGTM.
| if (f.references().subsetOf(leftOutput)) { | ||
| bothSides.add(f); | ||
| } else { | ||
| rightFilters.add(f); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This case should never happen, no?
There shouldn't be an attribute that is part of the join key, but not part of the left, as otherwise the join couldn't happen. Might be easier to replace this with an assertion and have ScopedFilter instance created with a Set.of()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, that's a very good point. There is an use case I didn't cover (the original issue focused on the groupings pushing down) and that is the one where the calculated values are filtered further in the query. In this case those filters could be pushed down on the right hand side of the InlineJoin and the filtering done before the actual HashJoin and, I think, it should be more performant (hashing happening on fewer rows/values).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is yet not possible. See #138257
| // 3. filter that requires both sides to be evaluated | ||
| ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right); | ||
| // Split the filter condition in 3 parts. | ||
| // For InlineJoin we use a scoping that allows pushing down filters either to right side only or to both sides. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // For InlineJoin we use a scoping that allows pushing down filters either to right side only or to both sides. | |
| // For InlineJoin we use a scoping that allows pushing down filters either to left side only or to both sides. |
| var pushableToLeftSide = join instanceof InlineJoin ? scoped.commonFilters() : scoped.leftFilters(); | ||
| var pushableToRightSide = scoped.rightFilters(); | ||
| var commonFilters = join instanceof InlineJoin ? scoped.leftFilters() : scoped.commonFilters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the naming here is a bit tricky since, as done now, the filters pushable on the LHS are considered to be the ones taken from the RHS join keys. However, these are the same as the LHS join keys (for InlineJoin).
Basically, we want to push on the LHS of the (inline) join the filters defined on the groupings (which are the join keys). Given that, I think I'd update scopeInlineStatsFilter() code to start with the bothSides var be initialised with the filters param (and not leftFilters as is now) and then extract to the leftFilters those predicates/filters that are part of the join keys.
This would then allow initialising resulting ScopedFilter instance inline with the usage at this code location where this comment is left.
…push_filters_past_inlinestats
…push_filters_past_inlinestats
bpintea
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Andrei, looks great now!
💔 Backport failedThe backport operation could not be completed due to the following error: You can use sqren/backport to manually backport by running |

Addresses #127497