Skip to content

Commit

Permalink
Wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed Sep 7, 2024
1 parent ba73e8a commit df9e3db
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl OptimizerRule for FilterNullJoinKeys {
return Ok(Transformed::no(plan));
}
match plan {
LogicalPlan::Join(mut join) if !join.on.is_empty() && !join.null_equals_null => {
LogicalPlan::Join(mut join)
if !join.on.is_empty() && !join.null_equals_null =>
{
let (left_preserved, right_preserved) =
on_lr_is_preserved(join.join_type);

Expand Down
36 changes: 20 additions & 16 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2009,23 +2009,27 @@ logical_plan
03)----Aggregate: groupBy=[[l.col0, l.col1, l.col2]], aggr=[[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]]
04)------Inner Join: l.col0 = r.col0
05)--------SubqueryAlias: l
06)----------TableScan: tab0 projection=[col0, col1, col2]
07)--------SubqueryAlias: r
08)----------TableScan: tab0 projection=[col0, col1]
06)----------Filter: tab0.col0 IS NOT NULL
07)------------TableScan: tab0 projection=[col0, col1, col2]
08)--------SubqueryAlias: r
09)----------Filter: tab0.col0 IS NOT NULL
10)------------TableScan: tab0 projection=[col0, col1]
physical_plan
01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
02)--SortExec: expr=[col0@0 ASC NULLS LAST], preserve_partitioning=[true]
03)----ProjectionExec: expr=[col0@0 as col0, last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]@3 as last_col1]
04)------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
12)----------------------MemoryExec: partitions=1, partition_sizes=[3]
13)----------------------MemoryExec: partitions=1, partition_sizes=[3]
08)--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
11)--------------------CoalesceBatchesExec: target_batch_size=8192
12)----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
14)--------------------------CoalesceBatchesExec: target_batch_size=8192
15)----------------------------FilterExec: col0@0 IS NOT NULL
16)------------------------------MemoryExec: partitions=1, partition_sizes=[3]
17)--------------------CoalesceBatchesExec: target_batch_size=8192
18)----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4
19)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
20)--------------------------CoalesceBatchesExec: target_batch_size=8192
21)----------------------------FilterExec: col0@0 IS NOT NULL
22)------------------------------MemoryExec: partitions=1, partition_sizes=[3]

# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
Expand Down
18 changes: 11 additions & 7 deletions datafusion/sqllogictest/test_files/join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -750,14 +750,18 @@ WHERE t1.a=t2.a;
----
logical_plan
01)Inner Join: t1.a = t2.a
02)--TableScan: t1 projection=[a, b]
03)--SubqueryAlias: t2
04)----TableScan: t1 projection=[a, b]
02)--Filter: t1.a IS NOT NULL
03)----TableScan: t1 projection=[a, b]
04)--SubqueryAlias: t2
05)----Filter: t1.a IS NOT NULL
06)------TableScan: t1 projection=[a, b]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0)]
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----MemoryExec: partitions=1, partition_sizes=[1]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------FilterExec: a@0 IS NOT NULL
05)--------MemoryExec: partitions=1, partition_sizes=[1]
06)----CoalesceBatchesExec: target_batch_size=8192
07)------FilterExec: a@0 IS NOT NULL
08)--------MemoryExec: partitions=1, partition_sizes=[1]

# Reset the configs to old values
statement ok
Expand Down

0 comments on commit df9e3db

Please sign in to comment.