From 2fcb6bace0bdd379fdd816d6903d67415ae381e2 Mon Sep 17 00:00:00 2001 From: 0xPoe Date: Mon, 28 Jul 2025 22:49:34 +0200 Subject: [PATCH 1/2] feat: implement partition_statistics for HashJoinExec Signed-off-by: 0xPoe --- .../partition_statistics.rs | 198 ++++++++++++++++++ .../physical-plan/src/joins/hash_join.rs | 155 ++++++++++++-- 2 files changed, 339 insertions(+), 14 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 4b39e37f94e82..0a56be1c96b96 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -758,4 +758,202 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_hash_join_partition_statistics() -> Result<()> { + use datafusion_common::{JoinType, NullEquality}; + use datafusion_physical_expr::expressions::Column; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create left table scan and coalesce to 1 partition for CollectLeft mode + let left_scan = create_scan_exec_with_statistics(None, Some(2)).await; + let left_scan_coalesced = Arc::new(CoalescePartitionsExec::new(left_scan.clone())) + as Arc; + + // Create right table scan with different table name + let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL, date DATE) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + PARTITIONED BY (date) \ + WITH ORDER (id ASC);"; + let right_scan = + create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await; + + // Create join condition: t1.id = t2.id + let on = vec![( + Arc::new(Column::new("id", 0)) as Arc, + Arc::new(Column::new("id", 0)) as Arc, + )]; + + // Test CollectLeft mode - left child must have 1 partition + let collect_left_join = Arc::new(HashJoinExec::try_new( + left_scan_coalesced, + Arc::clone(&right_scan), + on.clone(), + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?) as Arc; + + // Test partition statistics for CollectLeft mode + let statistics = (0..collect_left_join.output_partitioning().partition_count()) + .map(|idx| collect_left_join.partition_statistics(Some(idx))) + .collect::>>()?; + + // Check that we have the expected number of partitions + assert_eq!(statistics.len(), 2); + + // For collect left mode, the min/max values are from the entire left table and the specific partition of the right table. + let expected_p0_statistics = Statistics { + num_rows: Precision::Inexact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + assert_eq!(statistics[0], expected_p0_statistics); + + // Test Partitioned mode + let partitioned_join = Arc::new(HashJoinExec::try_new( + Arc::clone(&left_scan), + Arc::clone(&right_scan), + on.clone(), + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + )?) as Arc; + + // Test partition statistics for Partitioned mode + let statistics = (0..partitioned_join.output_partitioning().partition_count()) + .map(|idx| partitioned_join.partition_statistics(Some(idx))) + .collect::>>()?; + + // Check that we have the expected number of partitions + assert_eq!(statistics.len(), 2); + + // For partitioned mode, the min/max values are from the specific partition for each side. + let expected_p0_statistics = Statistics { + num_rows: Precision::Inexact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + assert_eq!(statistics[0], expected_p0_statistics); + + // Test Auto mode - should fall back to getting all partition statistics + let auto_join = Arc::new(HashJoinExec::try_new( + Arc::clone(&left_scan), + Arc::clone(&right_scan), + on, + None, + &JoinType::Inner, + None, + PartitionMode::Auto, + NullEquality::NullEqualsNothing, + )?) as Arc; + + // Test partition statistics for Auto mode + let statistics = (0..auto_join.output_partitioning().partition_count()) + .map(|idx| auto_join.partition_statistics(Some(idx))) + .collect::>>()?; + + // Check that we have the expected number of partitions + assert_eq!(statistics.len(), 2); + + // For auto mode, the min/max values are from the entire left and right tables. + let expected_p0_statistics = Statistics { + num_rows: Precision::Inexact(4), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }, + ], + }; + assert_eq!(statistics[0], expected_p0_statistics); + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index d3999c6cd8247..2b1cfc344c7a5 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -953,21 +953,56 @@ impl ExecutionPlan for HashJoinExec { } fn partition_statistics(&self, partition: Option) -> Result { - if partition.is_some() { - return Ok(Statistics::new_unknown(&self.schema())); + match (partition, self.mode) { + // For CollectLeft mode, the left side is collected into a single partition, + // so all left partitions are available to each output partition. + // For the right side, we need the specific partition statistics. + (Some(partition), PartitionMode::CollectLeft) => { + let left_stats = self.left.partition_statistics(None)?; + let right_stats = self.right.partition_statistics(Some(partition))?; + + let stats = estimate_join_statistics( + left_stats, + right_stats, + self.on.clone(), + &self.join_type, + &self.join_schema, + )?; + Ok(stats.project(self.projection.as_ref())) + } + + // For Partitioned mode, both sides are partitioned, so each output partition + // only has access to the corresponding partition from both sides. + (Some(partition), PartitionMode::Partitioned) => { + let left_stats = self.left.partition_statistics(Some(partition))?; + let right_stats = self.right.partition_statistics(Some(partition))?; + + let stats = estimate_join_statistics( + left_stats, + right_stats, + self.on.clone(), + &self.join_type, + &self.join_schema, + )?; + Ok(stats.project(self.projection.as_ref())) + } + + // For Auto mode or when no specific partition is requested, fall back to + // the current behavior of getting all partition statistics. + (None, _) | (Some(_), PartitionMode::Auto) => { + // TODO stats: it is not possible in general to know the output size of joins + // There are some special cases though, for example: + // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` + let stats = estimate_join_statistics( + self.left.partition_statistics(None)?, + self.right.partition_statistics(None)?, + self.on.clone(), + &self.join_type, + &self.join_schema, + )?; + Ok(stats.project(self.projection.as_ref())) + } } - // TODO stats: it is not possible in general to know the output size of joins - // There are some special cases though, for example: - // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` - let stats = estimate_join_statistics( - self.left.partition_statistics(None)?, - self.right.partition_statistics(None)?, - self.on.clone(), - &self.join_type, - &self.join_schema, - )?; - // Project statistics if there is a projection - Ok(stats.project(self.projection.as_ref())) } /// Tries to push `projection` down through `hash_join`. If possible, performs the @@ -4901,4 +4936,96 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } + + #[test] + fn test_partition_statistics() -> Result<()> { + use crate::test; + use crate::test::exec::StatisticsExec; + use datafusion_common::Statistics; + use datafusion_physical_expr::expressions::Column; + + let schema = test::aggr_test_schema(); + let left = Arc::new(StatisticsExec::new( + Statistics::new_unknown(&schema), + schema.as_ref().clone(), + )) as Arc; + let right = Arc::new(StatisticsExec::new( + Statistics::new_unknown(&schema), + schema.as_ref().clone(), + )) as Arc; + + let on = vec![( + Arc::new(Column::new("c1", 0)) as Arc, + Arc::new(Column::new("c1", 0)) as Arc, + )]; + + // Test CollectLeft mode + let collect_left_join = HashJoinExec::try_new( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + None, + &JoinType::Inner, + None, + PartitionMode::CollectLeft, + NullEquality::NullEqualsNothing, + )?; + + // When partition is specified for CollectLeft mode, it should get + // all partitions from left and specific partition from right + let stats = collect_left_join.partition_statistics(Some(0))?; + assert!(matches!( + stats.num_rows, + datafusion_common::stats::Precision::Absent + )); + + // Test Partitioned mode + let partitioned_join = HashJoinExec::try_new( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + NullEquality::NullEqualsNothing, + )?; + + // When partition is specified for Partitioned mode, it should get + // specific partition from both left and right + let stats = partitioned_join.partition_statistics(Some(0))?; + assert!(matches!( + stats.num_rows, + datafusion_common::stats::Precision::Absent + )); + + // Test Auto mode - should fall back to getting all partition statistics + let auto_join = HashJoinExec::try_new( + Arc::clone(&left), + Arc::clone(&right), + on, + None, + &JoinType::Inner, + None, + PartitionMode::Auto, + NullEquality::NullEqualsNothing, + )?; + + // When partition is specified for Auto mode, it should fall back to + // getting all partition statistics (unknown) + let stats = auto_join.partition_statistics(Some(0))?; + assert!(matches!( + stats.num_rows, + datafusion_common::stats::Precision::Absent + )); + + // When no partition is specified, should get all partition statistics + let stats = auto_join.partition_statistics(None)?; + assert!(matches!( + stats.num_rows, + datafusion_common::stats::Precision::Absent + )); + + Ok(()) + } } From 3dcc212eff3d50840341c121558faf629ed66d85 Mon Sep 17 00:00:00 2001 From: 0xPoe Date: Mon, 11 Aug 2025 20:41:04 +0200 Subject: [PATCH 2/2] chore: remove the useless test case --- .../physical-plan/src/joins/hash_join.rs | 92 ------------------- 1 file changed, 92 deletions(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 2b1cfc344c7a5..16e0d508a9076 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -4936,96 +4936,4 @@ mod tests { fn columns(schema: &Schema) -> Vec { schema.fields().iter().map(|f| f.name().clone()).collect() } - - #[test] - fn test_partition_statistics() -> Result<()> { - use crate::test; - use crate::test::exec::StatisticsExec; - use datafusion_common::Statistics; - use datafusion_physical_expr::expressions::Column; - - let schema = test::aggr_test_schema(); - let left = Arc::new(StatisticsExec::new( - Statistics::new_unknown(&schema), - schema.as_ref().clone(), - )) as Arc; - let right = Arc::new(StatisticsExec::new( - Statistics::new_unknown(&schema), - schema.as_ref().clone(), - )) as Arc; - - let on = vec![( - Arc::new(Column::new("c1", 0)) as Arc, - Arc::new(Column::new("c1", 0)) as Arc, - )]; - - // Test CollectLeft mode - let collect_left_join = HashJoinExec::try_new( - Arc::clone(&left), - Arc::clone(&right), - on.clone(), - None, - &JoinType::Inner, - None, - PartitionMode::CollectLeft, - NullEquality::NullEqualsNothing, - )?; - - // When partition is specified for CollectLeft mode, it should get - // all partitions from left and specific partition from right - let stats = collect_left_join.partition_statistics(Some(0))?; - assert!(matches!( - stats.num_rows, - datafusion_common::stats::Precision::Absent - )); - - // Test Partitioned mode - let partitioned_join = HashJoinExec::try_new( - Arc::clone(&left), - Arc::clone(&right), - on.clone(), - None, - &JoinType::Inner, - None, - PartitionMode::Partitioned, - NullEquality::NullEqualsNothing, - )?; - - // When partition is specified for Partitioned mode, it should get - // specific partition from both left and right - let stats = partitioned_join.partition_statistics(Some(0))?; - assert!(matches!( - stats.num_rows, - datafusion_common::stats::Precision::Absent - )); - - // Test Auto mode - should fall back to getting all partition statistics - let auto_join = HashJoinExec::try_new( - Arc::clone(&left), - Arc::clone(&right), - on, - None, - &JoinType::Inner, - None, - PartitionMode::Auto, - NullEquality::NullEqualsNothing, - )?; - - // When partition is specified for Auto mode, it should fall back to - // getting all partition statistics (unknown) - let stats = auto_join.partition_statistics(Some(0))?; - assert!(matches!( - stats.num_rows, - datafusion_common::stats::Precision::Absent - )); - - // When no partition is specified, should get all partition statistics - let stats = auto_join.partition_statistics(None)?; - assert!(matches!( - stats.num_rows, - datafusion_common::stats::Precision::Absent - )); - - Ok(()) - } }