Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable datafusion.optimizer.filter_null_join_keys by default #12369

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ config_namespace! {
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
/// predicate push down.
pub filter_null_join_keys: bool, default = false
pub filter_null_join_keys: bool, default = true

/// Should DataFusion repartition data using the aggregate keys to execute aggregates
/// in parallel using the provided `target_partitions` level
Expand Down
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ impl OptimizerRule for FilterNullJoinKeys {
return Ok(Transformed::no(plan));
}
match plan {
LogicalPlan::Join(mut join) if !join.on.is_empty() => {
LogicalPlan::Join(mut join)
if !join.on.is_empty() && !join.null_equals_null =>
{
let (left_preserved, right_preserved) =
on_lr_is_preserved(join.join_type);

Expand Down
21 changes: 8 additions & 13 deletions datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,12 @@ fn intersect() -> Result<()> {
let plan = test_sql(sql)?;
let expected =
"LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n Filter: test.col_int32 IS NOT NULL AND test.col_utf8 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_utf8]\
\n Filter: test.col_int32 IS NOT NULL AND test.col_utf8 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_utf8]\
\n Filter: test.col_int32 IS NOT NULL AND test.col_utf8 IS NOT NULL\
\n TableScan: test projection=[col_int32, col_utf8]";
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n LeftSemi Join: test.col_int32 = test.col_int32, test.col_utf8 = test.col_utf8\
\n Aggregate: groupBy=[[test.col_int32, test.col_utf8]], aggr=[[]]\
\n TableScan: test projection=[col_int32, col_utf8]\
\n TableScan: test projection=[col_int32, col_utf8]\
\n TableScan: test projection=[col_int32, col_utf8]";
assert_eq!(expected, format!("{plan}"));
Ok(())
}
Expand Down Expand Up @@ -281,11 +278,9 @@ fn test_same_name_but_not_ambiguous() {
let expected = "LeftSemi Join: t1.col_int32 = t2.col_int32\
\n Aggregate: groupBy=[[t1.col_int32]], aggr=[[]]\
\n SubqueryAlias: t1\
\n Filter: test.col_int32 IS NOT NULL\
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the link seems to be incomplete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This #12348 , I think I move the fix out of this PR, so it can be reviewed separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

\n TableScan: test projection=[col_int32]\
\n TableScan: test projection=[col_int32]\
\n SubqueryAlias: t2\
\n Filter: test.col_int32 IS NOT NULL\
\n TableScan: test projection=[col_int32]";
\n TableScan: test projection=[col_int32]";
assert_eq!(expected, format!("{plan}"));
}

Expand Down
93 changes: 61 additions & 32 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2009,9 +2009,11 @@ logical_plan
03)----Aggregate: groupBy=[[l.col0, l.col1, l.col2]], aggr=[[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]]
04)------Inner Join: l.col0 = r.col0
05)--------SubqueryAlias: l
06)----------TableScan: tab0 projection=[col0, col1, col2]
07)--------SubqueryAlias: r
08)----------TableScan: tab0 projection=[col0, col1]
06)----------Filter: tab0.col0 IS NOT NULL
07)------------TableScan: tab0 projection=[col0, col1, col2]
08)--------SubqueryAlias: r
09)----------Filter: tab0.col0 IS NOT NULL
10)------------TableScan: tab0 projection=[col0, col1]
physical_plan
01)SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
02)--SortExec: expr=[col0@0 ASC NULLS LAST], preserve_partitioning=[true]
Expand All @@ -2020,12 +2022,21 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as col1, col2@2 as col2], aggr=[last_value(r.col1) ORDER BY [r.col0 ASC NULLS LAST]]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
10)------------------CoalesceBatchesExec: target_batch_size=8192
11)--------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
12)----------------------MemoryExec: partitions=1, partition_sizes=[3]
13)----------------------MemoryExec: partitions=1, partition_sizes=[3]
08)--------------ProjectionExec: expr=[col0@2 as col0, col1@3 as col1, col2@4 as col2, col0@0 as col0, col1@1 as col1]
09)----------------CoalesceBatchesExec: target_batch_size=8192
10)------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col0@0, col0@0)]
11)--------------------CoalesceBatchesExec: target_batch_size=8192
12)----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4
13)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
14)--------------------------CoalesceBatchesExec: target_batch_size=8192
15)----------------------------FilterExec: col0@0 IS NOT NULL
16)------------------------------MemoryExec: partitions=1, partition_sizes=[3]
17)--------------------CoalesceBatchesExec: target_batch_size=8192
18)----------------------RepartitionExec: partitioning=Hash([col0@0], 4), input_partitions=4
19)------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
20)--------------------------CoalesceBatchesExec: target_batch_size=8192
21)----------------------------FilterExec: col0@0 IS NOT NULL
22)------------------------------MemoryExec: partitions=1, partition_sizes=[3]

# Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
# a,b,c column. Column a has cardinality 2, column b has cardinality 4.
Expand Down Expand Up @@ -2868,18 +2879,24 @@ logical_plan
04)------Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, e.sn, e.amount
05)--------Inner Join: s.currency = e.currency Filter: s.ts >= e.ts
06)----------SubqueryAlias: s
07)------------TableScan: sales_global projection=[zip_code, country, sn, ts, currency]
08)----------SubqueryAlias: e
09)------------TableScan: sales_global projection=[sn, ts, currency, amount]
07)------------Filter: sales_global.currency IS NOT NULL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benefits: Input to join is smaller, so smaller input, faster build, no nulls need to be hashed

But in order to skip hashing nulls, the input array would have to be "filtered" (aka copy the matching rows)

lower chance for data skew, other join can be planned, downstream kernels are faster, can possibly be pushed down into scan, etc. In distributed setting, might save a lot of IO as well.

The argument in the distributed setting makes sense to me, but the other ones seem like they are all of the class "faster in some cases but slower in others"

Copy link
Contributor Author

@Dandandan Dandandan Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But in order to skip hashing nulls, the input array would have to be "filtered" (aka copy the matching rows)

Correct, but you save some copying in RepartitionExec / build side concatenate as well, and copying / checking columns of keys in probe side.
In case there aren't any nulls (even if column is nullable), there is no copying happening.

Even with CSV / MemTable in many cases null filter can be combined with existing filter expressions, so no extra copying is happening (less copying in fact as fewer rows need to be copied).

08)--------------TableScan: sales_global projection=[zip_code, country, sn, ts, currency]
09)----------SubqueryAlias: e
10)------------Filter: sales_global.currency IS NOT NULL
11)--------------TableScan: sales_global projection=[sn, ts, currency, amount]
physical_plan
01)SortExec: expr=[sn@2 ASC NULLS LAST], preserve_partitioning=[false]
02)--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as sn, ts@3 as ts, currency@4 as currency, last_value(e.amount) ORDER BY [e.sn ASC NULLS LAST]@5 as last_rate]
03)----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, country@1 as country, ts@3 as ts, currency@4 as currency], aggr=[last_value(e.amount) ORDER BY [e.sn ASC NULLS LAST]]
04)------ProjectionExec: expr=[zip_code@2 as zip_code, country@3 as country, sn@4 as sn, ts@5 as ts, currency@6 as currency, sn@0 as sn, amount@1 as amount]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@2, currency@4)], filter=ts@0 >= ts@1, projection=[sn@0, amount@3, zip_code@4, country@5, sn@6, ts@7, currency@8]
07)------------MemoryExec: partitions=1, partition_sizes=[1]
08)------------MemoryExec: partitions=1, partition_sizes=[1]
07)------------CoalesceBatchesExec: target_batch_size=8192
08)--------------FilterExec: currency@2 IS NOT NULL
09)----------------MemoryExec: partitions=1, partition_sizes=[1]
10)------------CoalesceBatchesExec: target_batch_size=8192
11)--------------FilterExec: currency@4 IS NOT NULL
12)----------------MemoryExec: partitions=1, partition_sizes=[1]

query ITIPTR rowsort
SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
Expand Down Expand Up @@ -3864,20 +3881,26 @@ logical_plan
05)--------Projection: l.a, l.d, row_n
06)----------Inner Join: l.d = r.d Filter: CAST(l.a AS Int64) >= CAST(r.a AS Int64) - Int64(10)
07)------------SubqueryAlias: l
08)--------------TableScan: multiple_ordered_table projection=[a, d]
09)------------Projection: r.a, r.d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_n
10)--------------WindowAggr: windowExpr=[[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
11)----------------SubqueryAlias: r
12)------------------TableScan: multiple_ordered_table projection=[a, d]
08)--------------Filter: multiple_ordered_table.d IS NOT NULL
09)----------------TableScan: multiple_ordered_table projection=[a, d], partial_filters=[multiple_ordered_table.d IS NOT NULL]
10)------------Projection: r.a, r.d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS row_n
11)--------------Filter: r.d IS NOT NULL
12)----------------WindowAggr: windowExpr=[[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]]
13)------------------SubqueryAlias: r
14)--------------------TableScan: multiple_ordered_table projection=[a, d]
physical_plan
01)ProjectionExec: expr=[last_value(l.d) ORDER BY [l.a ASC NULLS LAST]@1 as amount_usd]
02)--AggregateExec: mode=Single, gby=[row_n@2 as row_n], aggr=[last_value(l.d) ORDER BY [l.a ASC NULLS LAST]], ordering_mode=Sorted
03)----CoalesceBatchesExec: target_batch_size=2
04)------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(d@1, d@1)], filter=CAST(a@0 AS Int64) >= CAST(a@1 AS Int64) - 10, projection=[a@0, d@1, row_n@4]
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
06)--------ProjectionExec: expr=[a@0 as a, d@1 as d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as row_n]
07)----------BoundedWindowAggExec: wdw=[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
08)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
05)--------CoalesceBatchesExec: target_batch_size=2
06)----------FilterExec: d@1 IS NOT NULL
07)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true
08)--------ProjectionExec: expr=[a@0 as a, d@1 as d, row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@2 as row_n]
09)----------CoalesceBatchesExec: target_batch_size=2
10)------------FilterExec: d@1 IS NOT NULL
11)--------------BoundedWindowAggExec: wdw=[row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "row_number() ORDER BY [r.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow, is_causal: false }], mode=[Sorted]
12)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true

# reset partition number to 8.
statement ok
Expand Down Expand Up @@ -4021,21 +4044,27 @@ logical_plan
03)----SubqueryAlias: lhs
04)------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
05)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
06)----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
07)----SubqueryAlias: rhs
08)------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
09)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
10)----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
06)----------Filter: multiple_ordered_table_with_pk.b IS NOT NULL
07)------------TableScan: multiple_ordered_table_with_pk projection=[b, c, d], partial_filters=[multiple_ordered_table_with_pk.b IS NOT NULL]
08)----SubqueryAlias: rhs
09)------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
10)--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
11)----------Filter: multiple_ordered_table_with_pk.b IS NOT NULL
12)------------TableScan: multiple_ordered_table_with_pk projection=[b, c, d], partial_filters=[multiple_ordered_table_with_pk.b IS NOT NULL]
physical_plan
01)ProjectionExec: expr=[c@0 as c, c@2 as c, sum1@1 as sum1, sum1@3 as sum1]
02)--CoalesceBatchesExec: target_batch_size=2
03)----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)], projection=[c@0, sum1@2, c@3, sum1@5]
04)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1]
05)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
07)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1]
08)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
06)----------CoalesceBatchesExec: target_batch_size=2
07)------------FilterExec: b@0 IS NOT NULL
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
09)------ProjectionExec: expr=[c@0 as c, b@1 as b, sum(multiple_ordered_table_with_pk.d)@2 as sum1]
10)--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[sum(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
11)----------CoalesceBatchesExec: target_batch_size=2
12)------------FilterExec: b@0 IS NOT NULL
13)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true

query TT
EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
datafusion.optimizer.filter_null_join_keys false
datafusion.optimizer.filter_null_join_keys true
datafusion.optimizer.hash_join_single_partition_threshold 1048576
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072
datafusion.optimizer.max_passes 3
Expand Down Expand Up @@ -314,7 +314,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.filter_null_join_keys true When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.
datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.hash_join_single_partition_threshold_rows 131072 The maximum estimated size in rows for one input side of a HashJoin will be collected into a single partition
datafusion.optimizer.max_passes 3 Number of times that the optimizer will attempt to optimize the plan
Expand Down
Loading
Loading