Skip to content

Add FuseCrossJoinedGlobalAggregations rule#14271

Closed
lukasz-stec wants to merge 1 commit intotrinodb:masterfrom
starburstdata:ls/043-fuse-simple-aggregation-join
Closed

Add FuseCrossJoinedGlobalAggregations rule#14271
lukasz-stec wants to merge 1 commit intotrinodb:masterfrom
starburstdata:ls/043-fuse-simple-aggregation-join

Conversation

@lukasz-stec
Copy link
Copy Markdown
Member

Description

Add a simplified version of the JoinOnKeys rule
from "Computation Reuse via Fusion in Amazon Athena" paper that acts only on cross join over global
aggregations over table scans with possible filter. This can transform:

select from (select count(*) c1 from table where a = 1),
  (select count(*) c2 from table where b = 1)

into

select count(*) filter (where a = 1),
 count(*) filter (where b = 1)
from table where a = 1 or b = 1

making the query read the table only once.

Non-technical explanation

Speed up queries that have a subquery that matches the specific pattern of
cross join over global aggregation on the same table by reading the table only once.

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( X) Release notes are required, with the following suggested text:

# General
* Speed up a cross, self joins over aggregations by reading the table only once

@cla-bot cla-bot bot added the cla-signed label Sep 23, 2022
@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch 2 times, most recently from 0bd1ca9 to f20aee4 Compare September 26, 2022 12:09
@lukasz-stec
Copy link
Copy Markdown
Member Author

Improvement for a basic query that matches this rule:
before

explain SELECT * from (SELECT count(*) c_1 FROM lineitem WHERE suppkey = 3332274), (SELECT count(*) c_2 FROM lineitem WHERE suppkey = 1296500);

trino:tpch_sf1000_dec_orc_part> SELECT * from (SELECT count(*) c_1 FROM lineitem WHERE suppkey = 3332274), (SELECT count(*) c_2 FROM lineitem WHERE suppkey = 1296500);
 c_1 | c_2 
-----+-----
 783 | 485 
(1 row)

Query 20220926_104253_00011_p5eka, FINISHED, 6 nodes
http://localhost:8081/ui/query.html?20220926_104253_00011_p5eka
Splits: 5,378 total, 5,378 done (100.00%)
CPU Time: 473.1s total, 25.4M rows/s, 86.3MB/s, 19% active
Per Node: 10.1 parallelism,  255M rows/s,  867MB/s
Parallelism: 60.3
Peak Memory: 1.75KB
7.84 [12B rows, 39.9GB] [1.53B rows/s, 5.08GB/s]

after

trino:tpch_sf1000_dec_orc_part> set session fuse_sub_plan=true;
SET SESSION
trino:tpch_sf1000_dec_orc_part> SELECT * from (SELECT count(*) c_1 FROM lineitem WHERE suppkey = 3332274), (SELECT count(*) c_2 FROM lineitem WHERE suppkey = 1296500);
 c_1 | c_2 
-----+-----
 783 | 485 
(1 row)

Query 20220926_104318_00013_p5eka, FINISHED, 6 nodes
http://localhost:8081/ui/query.html?20220926_104318_00013_p5eka
Splits: 2,674 total, 2,674 done (100.00%)
CPU Time: 255.4s total, 23.5M rows/s, 79.8MB/s, 23% active
Per Node: 9.5 parallelism,  224M rows/s,  760MB/s
Parallelism: 57.1
Peak Memory: 6.75KB
4.47 [6B rows, 19.9GB] [1.34B rows/s, 4.45GB/s]

Copy link
Copy Markdown
Member

@gaurav8297 gaurav8297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch 2 times, most recently from f4acdd9 to 5523436 Compare October 5, 2022 09:50
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most comments addressed, the documentation of the fuse operation is still pending

@lukasz-stec lukasz-stec requested a review from gaurav8297 October 5, 2022 14:15
Copy link
Copy Markdown
Member

@gaurav8297 gaurav8297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we somehow fix this? Because it seems like a pretty imp test

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing the entire infra is hard and I don't know if we want to do it just for this case because it makes a lot of things easier.
Also, I have this case tested in AbstractTestJoinQueries.testFuseCrossJoinOnGlobalAggregationWithDuplicatedAggregation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no point in adding a disabled test. I didn't get what's the issue from "test infra assumption that projection always has unique target expression"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no point in adding a disabled test.

I added it for 2 reasons. One, to avoid questions now and in the future, why this case is not tested here, second, if the testing infra allows for not unique symbol target, we could enable this test and drop the AbstractTestJoinQueries.testFuseCrossJoinOnGlobalAggregationWithDuplicatedAggregation.

I didn't get what's the issue from "test infra assumption that projection always has unique target expression"

The issue is that AliasMatcher assumes that the plan will have unique symbol to expression assignments so for example plan like

      project(ImmutableMap.of(
                                        "sumLeft", PlanMatchPattern.expression("sumLeft"),
                                        "sumRight", PlanMatchPattern.expression("sumLeft")),

is not allowed because two different symbols are mapped to the same expression (symbol sumLeft).

In this case you get

java.lang.IllegalStateException: Ambiguous expression "sumLeft" matches multiple assignments ["sumLeft", "sumLeft"]

	at com.google.common.base.Preconditions.checkState(Preconditions.java:821)
	at io.trino.sql.planner.assertions.ExpressionMatcher.getAssignedSymbol(ExpressionMatcher.java:88)
	at io.trino.sql.planner.assertions.AliasMatcher.detailMatches(AliasMatcher.java:57)

I don't think it can be easily fixed, at least I don't know how.

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from 5523436 to bb48f0c Compare October 7, 2022 10:30
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments addressed

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing the entire infra is hard and I don't know if we want to do it just for this case because it makes a lot of things easier.
Also, I have this case tested in AbstractTestJoinQueries.testFuseCrossJoinOnGlobalAggregationWithDuplicatedAggregation.

@lukasz-stec lukasz-stec requested a review from gaurav8297 October 7, 2022 10:30
@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from bb48f0c to 079b03f Compare October 7, 2022 11:43
Copy link
Copy Markdown
Member

@raunaqmorarka raunaqmorarka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preliminary comments, will add more

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch 3 times, most recently from 5bffedb to 5265360 Compare October 11, 2022 06:55
@lukasz-stec
Copy link
Copy Markdown
Member Author

added prefix PR #14559 for AggregationNode.isSingleGlobalAggregation. Only the last commit matters here.
Also, addressed all comments except for docs

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch 3 times, most recently from da6dec8 to adc2eef Compare October 12, 2022 15:57
@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from adc2eef to 2ea95bc Compare October 20, 2022 14:34
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from 82399c1 to aeb511f Compare November 4, 2022 13:15
Comment on lines 504 to 520
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning symbol when mapping doesn't exist is a bit risky, it's assuming that all the un-mapped symbols come from the left side output symbols.
What if we add identity symbol mappings for the symbols from left node output, then we can always require that there is a non-null mapped ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is that we want to use the mapped (left) symbol if the mapping exists and use the right symbol if it does not. This stems directly from the fuse operation definition (see below) that either the right symbol is unique (not on the left) or it should be mapped. Put it differently, if there is no mapping than the right symbol has to be in the fused plan outputs.

If Fuse(P1, P2) = (P, M, L, R), then:
- P is the fused resulting plan. The schema of P includes
all output columns in P1 and, optionally, additional
output columns from P2.
- M is a mapping from the output columns of P2 to output
columns of P.
- L and R are two filter conditions defined over the output
columns of P to restore P1 and P2, respectively

What if we add identity symbol mappings for the symbols from left node output

that wouldn't work. We could add identity mappings from the right node but that would be deviation from the paper (the paper describes in detail how the mapping is constructed for each plan node) and IMO keeping the implementation close to the paper makes it more straightforward and less error prune

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from aeb511f to 776196a Compare November 9, 2022 08:52
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's an effect of adding another RemoveRedundantIdentityProjections rule. I don't know the exact details.

Comment on lines 504 to 520
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is that we want to use the mapped (left) symbol if the mapping exists and use the right symbol if it does not. This stems directly from the fuse operation definition (see below) that either the right symbol is unique (not on the left) or it should be mapped. Put it differently, if there is no mapping than the right symbol has to be in the fused plan outputs.

If Fuse(P1, P2) = (P, M, L, R), then:
- P is the fused resulting plan. The schema of P includes
all output columns in P1 and, optionally, additional
output columns from P2.
- M is a mapping from the output columns of P2 to output
columns of P.
- L and R are two filter conditions defined over the output
columns of P to restore P1 and P2, respectively

What if we add identity symbol mappings for the symbols from left node output

that wouldn't work. We could add identity mappings from the right node but that would be deviation from the paper (the paper describes in detail how the mapping is constructed for each plan node) and IMO keeping the implementation close to the paper makes it more straightforward and less error prune

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I think it's useful. First, it directly matches the paper definition, second, it allows for convenience methods like map (with default return) and requiredMap

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from 776196a to cfb5f9a Compare November 10, 2022 09:37
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA

Comment on lines 289 to 294
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, AggregationNode.Aggregation#equals uses mask for comparison.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exposing FusedPlanNode in public API, I think this logic can move to PlanNodeFuser and we have that return a Optional<PlanNode> instead

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO FusedPlanNode is the public API for fusion.
Now, for this specific case, I could move the check but for the more general version of this rule that is coming (JoinOnKeys) and for the other rules, the filters can be there

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, are the filters going to be handled differently based on what is the parent node ?
Can we move the part about adding projections into PlanNodeFuser by adding Set<Symbol> resultOutputSymbols to API or is this part also specific to this rule ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the filters going to be handled differently based on what is the parent node ?

Yes, at least this is what I see in the paper (the implementation can vary)

Can we move the part about adding projections into PlanNodeFuser by adding Set resultOutputSymbols to API or is this part also specific to this rule ?

I don't think it's specific to this rule but also we don't have to add it to the fusion API.
I refactored this and moved this logic to the static io.trino.sql.planner.iterative.rule.fuse.PlanNodeFuser#fuse(io.trino.sql.planner.iterative.Rule.Context, io.trino.sql.planner.plan.PlanNode, io.trino.sql.planner.plan.PlanNode)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are using rightAggregation.getMask() instead of mappedMask ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think mappedMask should be used. I need to figure out a test case for this (in progress)

@raunaqmorarka
Copy link
Copy Markdown
Member

Please rebase to fix conflicts

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from cfb5f9a to 5460671 Compare November 14, 2022 10:33
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments addressed, test pending for the aggregation with mask case

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO FusedPlanNode is the public API for fusion.
Now, for this specific case, I could move the check but for the more general version of this rule that is coming (JoinOnKeys) and for the other rules, the filters can be there

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think mappedMask should be used. I need to figure out a test case for this (in progress)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably use node.getId() here as the original JoinNode should get discarded

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to PlanNodeFuser so it's not easy to reuse the id.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, are the filters going to be handled differently based on what is the parent node ?
Can we move the part about adding projections into PlanNodeFuser by adding Set<Symbol> resultOutputSymbols to API or is this part also specific to this rule ?

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from 5460671 to 7da1f63 Compare November 14, 2022 15:07
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe inversion would be better to read !(a && b) == (!a || !b)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm conflicted. Usually logical and is easier to read but in this case, the logic is: if any of the corresponding values are different, bailout. For this case, or over not seems more readable to me.

@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from 7da1f63 to 05e42f6 Compare November 15, 2022 08:40
Copy link
Copy Markdown
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the filters going to be handled differently based on what is the parent node ?

Yes, at least this is what I see in the paper (the implementation can vary)

Can we move the part about adding projections into PlanNodeFuser by adding Set resultOutputSymbols to API or is this part also specific to this rule ?

I don't think it's specific to this rule but also we don't have to add it to the fusion API.
I refactored this and moved this logic to the static io.trino.sql.planner.iterative.rule.fuse.PlanNodeFuser#fuse(io.trino.sql.planner.iterative.Rule.Context, io.trino.sql.planner.plan.PlanNode, io.trino.sql.planner.plan.PlanNode)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to PlanNodeFuser so it's not easy to reuse the id.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I'm conflicted. Usually logical and is easier to read but in this case, the logic is: if any of the corresponding values are different, bailout. For this case, or over not seems more readable to me.

Add simplified version of the JoinOnKeys rule
from "Computation Reuse via Fusion in Amazon Athena"
paper that acts only on cross join over global
aggregations over table scans with possible filter.
This can transform:
select from (select count(*) c1 from table where a = 1),
  (select count(*) c2 from table where b = 1)
into
select count(*) filter (where a = 1),
 count(*) filter (where b = 1)
from table where a = 1 or b = 1
making the query to read the table only once.
@lukasz-stec lukasz-stec force-pushed the ls/043-fuse-simple-aggregation-join branch from 05e42f6 to 0caf276 Compare November 15, 2022 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

4 participants