diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index 468d25e0e57d0..3f079b88d563f 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -30,6 +30,7 @@ mod test { use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use datafusion_execution::TaskContext; use datafusion_execution::config::SessionConfig; + use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; use datafusion_expr_common::operator::Operator; use datafusion_functions_aggregate::count::count_udaf; use datafusion_physical_expr::Partitioning; @@ -52,6 +53,7 @@ mod test { use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::union::{InterleaveExec, UnionExec}; + use datafusion_physical_plan::windows::{WindowAggExec, create_window_expr}; use datafusion_physical_plan::{ ExecutionPlan, ExecutionPlanProperties, execute_stream_partitioned, get_plan_string, @@ -1154,4 +1156,105 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_statistic_by_partition_of_window_agg() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + + let window_expr = create_window_expr( + &WindowFunctionDefinition::AggregateUDF(count_udaf()), + "count".to_owned(), + &[col("id", &scan.schema())?], + &[], // no partition by + &[PhysicalSortExpr::new( + col("id", &scan.schema())?, + SortOptions::default(), + )], + Arc::new(WindowFrame::new(Some(false))), + scan.schema(), + false, + false, + None, + )?; + + let window_agg: Arc = + Arc::new(WindowAggExec::try_new(vec![window_expr], scan, true)?); + + // Verify partition statistics are properly propagated (not unknown) + let statistics = (0..window_agg.output_partitioning().partition_count()) + .map(|idx| window_agg.partition_statistics(Some(idx))) + .collect::>>()?; + + assert_eq!(statistics.len(), 2); + + // Window functions preserve input row counts and column statistics + // but add unknown statistics for the new window column + let expected_statistic_partition_1 = Statistics { + num_rows: Precision::Exact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(8), + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Date32(Some( + DATE_2025_03_02, + ))), + min_value: Precision::Exact(ScalarValue::Date32(Some( + DATE_2025_03_01, + ))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(8), + }, + ColumnStatistics::new_unknown(), // window column + ], + }; + + let expected_statistic_partition_2 = Statistics { + num_rows: Precision::Exact(2), + total_byte_size: Precision::Absent, + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(2))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(8), + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Date32(Some( + DATE_2025_03_04, + ))), + min_value: Precision::Exact(ScalarValue::Date32(Some( + DATE_2025_03_03, + ))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Exact(8), + }, + ColumnStatistics::new_unknown(), // window column + ], + }; + + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Verify the statistics match actual execution results + let expected_stats = vec![ + ExpectedStatistics::NonEmpty(3, 4, 2), + ExpectedStatistics::NonEmpty(1, 2, 2), + ]; + validate_statistics_with_data(window_agg, expected_stats, 0).await?; + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index d6d5f4fdd2a67..aa99f4f49885a 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -158,24 +158,6 @@ impl WindowAggExec { .unwrap_or_else(Vec::new) } } - - fn statistics_inner(&self) -> Result { - let input_stat = self.input.partition_statistics(None)?; - let win_cols = self.window_expr.len(); - let input_cols = self.input.schema().fields().len(); - // TODO stats: some windowing function will maintain invariants such as min, max... - let mut column_statistics = Vec::with_capacity(win_cols + input_cols); - // copy stats of the input to the beginning of the schema. - column_statistics.extend(input_stat.column_statistics); - for _ in 0..win_cols { - column_statistics.push(ColumnStatistics::new_unknown()) - } - Ok(Statistics { - num_rows: input_stat.num_rows, - column_statistics, - total_byte_size: Precision::Absent, - }) - } } impl DisplayAs for WindowAggExec { @@ -291,15 +273,25 @@ impl ExecutionPlan for WindowAggExec { } fn statistics(&self) -> Result { - self.statistics_inner() + self.partition_statistics(None) } fn partition_statistics(&self, partition: Option) -> Result { - if partition.is_none() { - self.statistics_inner() - } else { - Ok(Statistics::new_unknown(&self.schema())) + let input_stat = self.input.partition_statistics(partition)?; + let win_cols = self.window_expr.len(); + let input_cols = self.input.schema().fields().len(); + // TODO stats: some windowing function will maintain invariants such as min, max... + let mut column_statistics = Vec::with_capacity(win_cols + input_cols); + // copy stats of the input to the beginning of the schema. + column_statistics.extend(input_stat.column_statistics); + for _ in 0..win_cols { + column_statistics.push(ColumnStatistics::new_unknown()) } + Ok(Statistics { + num_rows: input_stat.num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) } }