Skip to content

Commit

Permalink
Enabling LogicalProject pushdown optimizations to eliminate exchange …
Browse files Browse the repository at this point in the history
…of unused columns (apache#14198)
  • Loading branch information
shauryachats authored Oct 22, 2024
1 parent 8774d32 commit 281478e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ private PinotQueryRuleSets() {
CoreRules.FILTER_PROJECT_TRANSPOSE
);

// Project pushdown rules run using a RuleCollection since we want to push down a project as much as possible in a
// single HepInstruction.
public static final List<RelOptRule> PROJECT_PUSHDOWN_RULES = List.of(
CoreRules.PROJECT_FILTER_TRANSPOSE,
CoreRules.PROJECT_JOIN_TRANSPOSE,
CoreRules.PROJECT_MERGE
);

// The pruner rules run top-down to ensure Calcite restarts from root node after applying a transformation.
public static final List<RelOptRule> PRUNE_RULES = List.of(
CoreRules.AGGREGATE_PROJECT_MERGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ private static HepProgram getOptProgram() {
// Pushdown filters using a single HepInstruction.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);

// Pushdown projects after first filter pushdown to minimize projected columns.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.PROJECT_PUSHDOWN_RULES);

// Pushdown filters again since filter should be pushed down at the lowest level, after project pushdown.
hepProgramBuilder.addRuleCollection(PinotQueryRuleSets.FILTER_PUSHDOWN_RULES);

// ----
// Prune duplicate/unnecessary nodes using a single HepInstruction.
// TODO: We can consider using HepMatchOrder.TOP_DOWN if we find cases where it would help.
Expand Down
98 changes: 78 additions & 20 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,14 @@
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[IS NOT TRUE($8)])",
"\n LogicalJoin(condition=[=($6, $7)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[6]])",
"\n LogicalProject(col1=[$0], col2=[$1], col30=[$3], $f1=[$4], col32=[$5], $f10=[$7], col34=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($7)])",
"\n LogicalJoin(condition=[=($5, $6)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[5]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col30=[$3], $f1=[$5], col32=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($4)])",
"\n LogicalJoin(condition=[=($2, $3)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalProject(col1=[$0], col2=[$1], col34=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($5)])",
"\n LogicalJoin(condition=[=($3, $4)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[3]])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2], col32=[$2])",
"\n LogicalFilter(condition=[IS NOT TRUE($5)])",
"\n LogicalJoin(condition=[=($3, $4)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[3]])",
Expand All @@ -294,19 +294,21 @@
"\n LogicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'bar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n LogicalProject(col3=[$0], $f1=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'bar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n LogicalProject(col3=[$0], $f1=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
},
Expand Down Expand Up @@ -517,6 +519,62 @@
"\n LogicalTableScan(table=[[default, a]])",
"\n"
]
},
{
"description": "Multiple IN and NOT IN joins while selecting count at top",
"sql": "EXPLAIN PLAN FOR SELECT count(*) FROM a WHERE a.col1 = 'foo' AND col2 = 'xylo' AND a.col4 = 12 AND a.col5 = false AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foo') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='bar') AND col3 NOT IN (SELECT col3 FROM b WHERE col1='foobar') AND col3 IN (SELECT col3 FROM b WHERE col1 = 'fork')",
"output": [
"Execution Plan",
"\nPinotLogicalAggregate(group=[{}], agg#0=[COUNT($0)])",
"\n PinotLogicalExchange(distribution=[hash])",
"\n PinotLogicalAggregate(group=[{}], agg#0=[COUNT()])",
"\n LogicalJoin(condition=[=($0, $1)], joinType=[semi])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0])",
"\n LogicalFilter(condition=[IS NOT TRUE($3)])",
"\n LogicalJoin(condition=[=($1, $2)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col3=[$0], col34=[$0])",
"\n LogicalFilter(condition=[IS NOT TRUE($3)])",
"\n LogicalJoin(condition=[=($1, $2)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col3=[$0], col32=[$0])",
"\n LogicalFilter(condition=[IS NOT TRUE($3)])",
"\n LogicalJoin(condition=[=($1, $2)], joinType=[left])",
"\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalProject(col3=[$2], col30=[$2])",
"\n LogicalFilter(condition=[AND(=($0, _UTF-8'foo'), =($1, _UTF-8'xylo'), =($3, 12), NOT($4))])",
"\n LogicalTableScan(table=[[default, a]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], $f1=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'foo')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], $f1=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'bar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$0], $f1=[$1])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n PinotLogicalAggregate(group=[{0}], agg#0=[MIN($1)])",
"\n LogicalProject(col3=[$2], $f1=[true])",
"\n LogicalFilter(condition=[=($0, _UTF-8'foobar')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n PinotLogicalExchange(distribution=[hash[0]])",
"\n LogicalProject(col3=[$2])",
"\n LogicalFilter(condition=[=($0, _UTF-8'fork')])",
"\n LogicalTableScan(table=[[default, b]])",
"\n"
]
}
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3404,7 +3404,7 @@
"sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$3])",
"\nLogicalProject(col1=[$0], w0$o0=[$3])",
"\n LogicalFilter(condition=[<($3, 5)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
Expand All @@ -3418,7 +3418,7 @@
"sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, RANK() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rank, DENSE_RANK() OVER(PARTITION BY a.col2 ORDER BY a.col3) as dense_rank from a) SELECT a.col1, a.rank, a.dense_rank FROM windowfunc AS a where a.dense_rank < 5",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$3], $2=[$4])",
"\nLogicalProject(col1=[$0], w0$o0=[$3], w0$o1=[$4])",
"\n LogicalFilter(condition=[<($4, 5)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2] aggs [RANK(), DENSE_RANK()])])",
"\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
Expand Down

0 comments on commit 281478e

Please sign in to comment.