Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions datafusion/core/tests/physical_optimizer/partition_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,4 +758,202 @@ mod test {

Ok(())
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to see tests call validate_statistics_with_data() to verify real execution.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM after this is done.

#[tokio::test]
async fn test_hash_join_partition_statistics() -> Result<()> {
use datafusion_common::{JoinType, NullEquality};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Create left table scan and coalesce to 1 partition for CollectLeft mode
let left_scan = create_scan_exec_with_statistics(None, Some(2)).await;
let left_scan_coalesced = Arc::new(CoalescePartitionsExec::new(left_scan.clone()))
as Arc<dyn ExecutionPlan>;

// Create right table scan with different table name
let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL, date DATE) \
STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\
PARTITIONED BY (date) \
WITH ORDER (id ASC);";
let right_scan =
create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await;

// Create join condition: t1.id = t2.id
let on = vec![(
Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
Arc::new(Column::new("id", 0)) as Arc<dyn PhysicalExpr>,
)];

// Test CollectLeft mode - left child must have 1 partition
let collect_left_join = Arc::new(HashJoinExec::try_new(
left_scan_coalesced,
Arc::clone(&right_scan),
on.clone(),
None,
&JoinType::Inner,
None,
PartitionMode::CollectLeft,
NullEquality::NullEqualsNothing,
)?) as Arc<dyn ExecutionPlan>;

// Test partition statistics for CollectLeft mode
let statistics = (0..collect_left_join.output_partitioning().partition_count())
.map(|idx| collect_left_join.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;

// Check that we have the expected number of partitions
assert_eq!(statistics.len(), 2);

// For collect left mode, the min/max values are from the entire left table and the specific partition of the right table.
let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
],
};
assert_eq!(statistics[0], expected_p0_statistics);

// Test Partitioned mode
let partitioned_join = Arc::new(HashJoinExec::try_new(
Arc::clone(&left_scan),
Arc::clone(&right_scan),
on.clone(),
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
NullEquality::NullEqualsNothing,
)?) as Arc<dyn ExecutionPlan>;

// Test partition statistics for Partitioned mode
let statistics = (0..partitioned_join.output_partitioning().partition_count())
.map(|idx| partitioned_join.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;

// Check that we have the expected number of partitions
assert_eq!(statistics.len(), 2);

// For partitioned mode, the min/max values are from the specific partition for each side.
let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(2),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
],
};
assert_eq!(statistics[0], expected_p0_statistics);

// Test Auto mode - should fall back to getting all partition statistics
let auto_join = Arc::new(HashJoinExec::try_new(
Arc::clone(&left_scan),
Arc::clone(&right_scan),
on,
None,
&JoinType::Inner,
None,
PartitionMode::Auto,
NullEquality::NullEqualsNothing,
)?) as Arc<dyn ExecutionPlan>;

// Test partition statistics for Auto mode
let statistics = (0..auto_join.output_partitioning().partition_count())
.map(|idx| auto_join.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;

// Check that we have the expected number of partitions
assert_eq!(statistics.len(), 2);

// For auto mode, the min/max values are from the entire left and right tables.
let expected_p0_statistics = Statistics {
num_rows: Precision::Inexact(4),
total_byte_size: Precision::Absent,
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
ColumnStatistics {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
},
],
};
assert_eq!(statistics[0], expected_p0_statistics);
Ok(())
}
}
63 changes: 49 additions & 14 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,21 +953,56 @@ impl ExecutionPlan for HashJoinExec {
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if partition.is_some() {
return Ok(Statistics::new_unknown(&self.schema()));
match (partition, self.mode) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
      let (left_stats, right_stats) = match (partition, self.mode) {
          (Some(p), PartitionMode::CollectLeft) => {
              (self.left.partition_statistics(None)?, self.right.partition_statistics(Some(p))?)
          }
          (Some(p), PartitionMode::Partitioned) => {
              (self.left.partition_statistics(Some(p))?, self.right.partition_statistics(Some(p))?)
          }
          (None, _) | (Some(_), PartitionMode::Auto) => {
              (self.left.partition_statistics(None)?, self.right.partition_statistics(None)?)
          }
      };
      let stats = estimate_join_statistics(left_stats, right_stats, &self.on, &self.join_type, &self.join_schema)?;
      Ok(stats.project(self.projection.as_ref()))
  }

// For CollectLeft mode, the left side is collected into a single partition,
// so all left partitions are available to each output partition.
// For the right side, we need the specific partition statistics.
(Some(partition), PartitionMode::CollectLeft) => {
let left_stats = self.left.partition_statistics(None)?;
let right_stats = self.right.partition_statistics(Some(partition))?;

let stats = estimate_join_statistics(
left_stats,
right_stats,
self.on.clone(),
&self.join_type,
&self.join_schema,
)?;
Ok(stats.project(self.projection.as_ref()))
}

// For Partitioned mode, both sides are partitioned, so each output partition
// only has access to the corresponding partition from both sides.
(Some(partition), PartitionMode::Partitioned) => {
let left_stats = self.left.partition_statistics(Some(partition))?;
let right_stats = self.right.partition_statistics(Some(partition))?;

let stats = estimate_join_statistics(
left_stats,
right_stats,
self.on.clone(),
&self.join_type,
&self.join_schema,
)?;
Ok(stats.project(self.projection.as_ref()))
}

// For Auto mode or when no specific partition is requested, fall back to
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For auto mode, as the comment says:

/// DataFusion optimizer decides which PartitionMode
    /// mode(Partitioned/CollectLeft) is optimal based on statistics. It will
    /// also consider swapping the left and right inputs for the Join
    Auto,

So if the method with partition is called after JoinSelection rule, it's impossible to see Auto mode, however, the method may be called first, I suggest evaluating the cost of related code about choosing partition mode in JoinSelection, then decide if we can decide the PartitionMode here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. However, it seems challenging to get the threshold here for conducting the same evaluation. Do you have any other ideas on how to share the same logic from the JoinSelection to determine the PartitionMode? I'm uncertain how to obtain the optimizer config for accessing the threshold settings in the current API design.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check later

Copy link
Copy Markdown
Contributor Author

@0xPoe 0xPoe Nov 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xudong963 Do you have time to look at the question I asked above? Thank you. Also, could you please help me reopen the PR?
For this question, I've thought about it a little more.

however, the method may be called first,

I searched the code base, no optimizer rules currently call partition_statistics(Some(partition))) before JoinSelection.

I’m also wondering — based on this, should we assert that the auto mode will never happen in this code path?

Or do we still want to determine the PartitionMode here? If so, that would mean we need to store collect_threshold_byte_size and collect_threshold_num_rows in HashJoinExec. I’m not sure if it’s a good design to expose these optimizer-related configs to the executor layer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched the code base, no optimizer rules currently call partition_statistics(Some(partition))) before JoinSelection.

The only rule before JoinSelection, called partition_statistics, is AggregateStatistics. However, it only calls partial_agg_exec.input().partition_statistics(None).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good for me to keep the old way!

// the current behavior of getting all partition statistics.
(None, _) | (Some(_), PartitionMode::Auto) => {
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
let stats = estimate_join_statistics(
self.left.partition_statistics(None)?,
self.right.partition_statistics(None)?,
self.on.clone(),
&self.join_type,
&self.join_schema,
)?;
Ok(stats.project(self.projection.as_ref()))
}
}
// TODO stats: it is not possible in general to know the output size of joins
// There are some special cases though, for example:
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
let stats = estimate_join_statistics(
self.left.partition_statistics(None)?,
self.right.partition_statistics(None)?,
self.on.clone(),
&self.join_type,
&self.join_schema,
)?;
// Project statistics if there is a projection
Ok(stats.project(self.projection.as_ref()))
}

/// Tries to push `projection` down through `hash_join`. If possible, performs the
Expand Down