diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 66d1380e09c38..ac1bef6b13d29 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 26a00ef0f29ce..3223768acb74f 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,11 +17,16 @@ use std::sync::Arc; +use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias; +use crate::physical_optimizer::sanity_checker::{ + assert_sanity_check, assert_sanity_check_err, +}; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, - local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr, + local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_stats, + repartition_exec, schema, single_partitioned_aggregate, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, @@ -3346,3 +3351,62 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc = + single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + // Test: plan is valid. + assert_sanity_check(&plan, true); + + // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!( + get_plan_string(&optimized), + vec![ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + // Bug: Plan is now invalid. + let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"; + assert_sanity_check_err(&optimized, err); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index a73d084a081f3..ee9cb032c3410 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef { } /// Check if sanity checker should accept or reject plans. -fn assert_sanity_check(plan: &Arc, is_sane: bool) { +pub(crate) fn assert_sanity_check(plan: &Arc, is_sane: bool) { let sanity_checker = SanityCheckPlan::new(); let opts = ConfigOptions::default(); assert_eq!( @@ -397,6 +397,14 @@ fn assert_sanity_check(plan: &Arc, is_sane: bool) { ); } +/// Assert reason for sanity check failure. +pub(crate) fn assert_sanity_check_err(plan: &Arc, err: &str) { + let sanity_checker = SanityCheckPlan::new(); + let opts = ConfigOptions::default(); + let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err(); + assert!(error.message().contains(err)); +} + /// Check if the plan we created is as expected by comparing the plan /// formatted as a string. fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index c7572eb089007..0b9c3b80bb935 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -30,9 +30,10 @@ use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; use datafusion_common::config::ConfigOptions; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; -use datafusion_common::{JoinType, Result}; +use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics}; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -102,6 +103,44 @@ pub fn schema() -> SchemaRef { ])) } +fn int64_stats() -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Absent, + sum_value: Precision::Absent, + max_value: Precision::Exact(1_000_000.into()), + min_value: Precision::Exact(0.into()), + distinct_count: Precision::Absent, + } +} + +fn column_stats() -> Vec { + vec![ + int64_stats(), // a + int64_stats(), // b + int64_stats(), // c + ColumnStatistics::default(), + ColumnStatistics::default(), + ] +} + +/// Create parquet datasource exec using schema from [`schema`]. +pub(crate) fn parquet_exec_with_stats() -> Arc { + let mut statistics = Statistics::new_unknown(&schema()); + statistics.num_rows = Precision::Inexact(10); + statistics.column_statistics = column_stats(); + + let config = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + Arc::new(ParquetSource::new(Default::default())), + ) + .with_file(PartitionedFile::new("x".to_string(), 10000)) + .with_statistics(statistics); + assert_eq!(config.statistics.num_rows, Precision::Inexact(10)); + + config.build() +} + pub fn create_test_schema() -> Result { let nullable_column = Field::new("nullable_col", DataType::Int32, true); let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false); @@ -522,6 +561,30 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec) -> Physica PhysicalGroupBy::new_single(group_by_expr.clone()) } +pub(crate) fn single_partitioned_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, +) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + pub fn assert_plan_matches_expected( plan: &Arc, expected: &[&str],