diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 863bb5181f451..46f4f8bfbcb62 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -4235,6 +4235,7 @@ checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", "serde", + "wasm-bindgen", ] [[package]] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 64ad8f2ba152b..8ade899f9d657 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -134,7 +134,7 @@ tempfile = { workspace = true } tokio = { workspace = true } tokio-util = { version = "0.7.4", features = ["io"], optional = true } url = { workspace = true } -uuid = { version = "1.7", features = ["v4"] } +uuid = { version = "1.7", features = ["v4", "js"] } xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 3c8d08ee32d44..8838262a59cf4 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1416,6 +1416,7 @@ pub(crate) mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use crate::physical_optimizer::enforce_sorting::EnforceSorting; + use crate::physical_optimizer::sanity_checker::check_plan_sanity; use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, repartition_exec, }; @@ -1426,11 +1427,13 @@ pub(crate) mod tests { use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{displayable, DisplayAs, DisplayFormatType, Statistics}; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_optimizer::output_requirements::OutputRequirements; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion_common::ScalarValue; - use datafusion_expr::Operator; + use datafusion_common::{ColumnStatistics, ScalarValue}; + use datafusion_expr::{AggregateUDF, Operator}; use datafusion_physical_expr::expressions::{BinaryExpr, Literal}; use datafusion_physical_expr::{ expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr, @@ -1526,8 +1529,8 @@ pub(crate) mod tests { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -1546,6 +1549,25 @@ pub(crate) mod tests { ])) } + fn int64_stats() -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Exact(1_000_000.into()), + min_value: Precision::Exact(0.into()), + distinct_count: Precision::Absent, + } + } + + fn column_stats() -> Vec { + vec![ + int64_stats(), // a + int64_stats(), // b + int64_stats(), // c + ColumnStatistics::default(), + ColumnStatistics::default(), + ] + } + fn parquet_exec() -> Arc { parquet_exec_with_sort(vec![]) } @@ -1562,6 +1584,20 @@ pub(crate) mod tests { .build_arc() } + pub(crate) fn parquet_exec_with_stats() -> Arc { + let mut statistics = Statistics::new_unknown(&schema()); + statistics.num_rows = Precision::Inexact(10); + statistics.column_statistics = column_stats(); + + let config = + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 10000)) + .with_statistics(statistics); + assert_eq!(config.statistics.num_rows, Precision::Inexact(10)); + + ParquetExec::builder(config).build_arc() + } + fn parquet_exec_multiple() -> Arc { parquet_exec_multiple_sorted(vec![]) } @@ -1629,7 +1665,7 @@ pub(crate) mod tests { ) } - fn projection_exec_with_alias( + pub(crate) fn projection_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { @@ -1643,6 +1679,15 @@ pub(crate) mod tests { fn aggregate_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, + ) -> Arc { + aggregate_exec_with_aggr_expr_and_alias(input, vec![], alias_pairs) + } + + #[expect(clippy::type_complexity)] + fn aggregate_exec_with_aggr_expr_and_alias( + input: Arc, + aggr_expr: Vec<(Arc, Vec>)>, + alias_pairs: Vec<(String, String)>, ) -> Arc { let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; @@ -1664,18 +1709,33 @@ pub(crate) mod tests { .collect::>(); let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); + let aggr_expr = aggr_expr + .into_iter() + .map(|(udaf, exprs)| { + AggregateExprBuilder::new(udaf.clone(), exprs) + .alias(udaf.name()) + .schema(Arc::clone(&schema)) + .build() + .map(Arc::new) + .unwrap() + }) + .collect::>(); + let filter_exprs = std::iter::repeat(None) + .take(aggr_expr.len()) + .collect::>(); + Arc::new( AggregateExec::try_new( AggregateMode::FinalPartitioned, final_grouping, - vec![], - vec![], + aggr_expr.clone(), + filter_exprs.clone(), Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, - vec![], - vec![], + aggr_expr, + filter_exprs, input, schema.clone(), ) @@ -1842,6 +1902,11 @@ pub(crate) mod tests { }; ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => { + // Use a small batch size, to trigger RoundRobin in tests + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, $PREFER_EXISTING_UNION, 1); + }; + + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr, $BATCH_SIZE: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); let mut config = ConfigOptions::new(); @@ -1850,8 +1915,12 @@ pub(crate) mod tests { config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION; - // Use a small batch size, to trigger RoundRobin in tests - config.execution.batch_size = 1; + config.execution.batch_size = $BATCH_SIZE; + + // This triggers the use of column statisticals estimates in the repartition calculation. + // Without this setting, the testing of `get_repartition_requirement_status` misses + // several branches. + config.execution.use_row_number_estimates_to_optimize_partitioning = true; // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade // because they were written prior to the separation of `BasicEnforcement` into @@ -1937,6 +2006,22 @@ pub(crate) mod tests { }; } + macro_rules! assert_optimized_without_forced_roundrobin { + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { + assert_optimized!( + $EXPECTED_LINES, + $PLAN, + $FIRST_ENFORCE_DIST, + false, + 10, + false, + 1024, + false, + 100 + ); + }; + } + macro_rules! assert_plan_txt { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); @@ -3436,6 +3521,273 @@ pub(crate) mod tests { Ok(()) } + fn aggregate_over_union( + input: Vec>, + ) -> Arc { + let union = union_exec(input); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + plan + } + + // Aggregate over a union, + // with current testing setup. + // + // It will repartiton twice for an aggregate over a union. + // * repartitions before the partial aggregate. + // * repartitions before the final aggregation. + #[test] + fn repartitions_twice_for_aggregate_after_union() -> Result<()> { + let plan = aggregate_over_union(vec![parquet_exec(); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert!(err.message().contains("ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)")); + + // Test: using the `assert_optimized` macro. + // + // Updated plan (post optimization) will have added RepartitionExecs (btwn union and aggregation). + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected, plan.clone(), true); + assert_optimized!(expected, plan.clone(), false); + + Ok(()) + } + + // Aggregate over a union, + // but make the test setup more realistic. + // + // It will repartiton once for an aggregate over a union. + // * repartitions btwn partial & final aggregations. + #[test] + fn repartitions_once_for_aggregate_after_union() -> Result<()> { + // use parquet exec with stats + let plan: Arc = + aggregate_over_union(vec![parquet_exec_with_stats(); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert!(err.message().contains("ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)")); + + // Test: using the `assert_optimized_without_forced_roundrobin` macro. + // This removes the forced round-robin repartitioning, + // by no longer hard-coding batch_size=1. + // + // Updated plan (post optimization) will have added only 1 RepartitionExec. + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=2", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!(expected, plan.clone(), true); + assert_optimized_without_forced_roundrobin!(expected, plan.clone(), false); + + Ok(()) + } + + /// Same as [`aggregate_over_union`], but with a sort btwn the union and aggregation. + fn aggregate_over_sorted_union( + input: Vec>, + ) -> Arc { + let union = union_exec(input); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union, false); + let plan = + aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + plan + } + + /// Same as [`repartitions_once_for_aggregate_after_union`], but adds a sort btwn + /// the union and the aggregate. This changes the outcome: + /// + /// * we no longer get a distribution error. + /// * but we still get repartitioning? + #[test] + fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { + let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(); 2]); + + // With the sort, there is no distribution error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It does not repartition on the first run + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortPreservingMergeExec: [a@0 ASC]", + "UnionExec", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_first_run, + plan.clone(), + true + ); + + // But does repartition on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=2", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "UnionExec", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_second_run, + plan.clone(), + false + ); + + Ok(()) + } + + /// Same as [`aggregate_over_sorted_union`], but with a sort btwn the union and aggregation. + fn aggregate_over_sorted_union_projection( + input: Vec>, + ) -> Arc { + let union = union_exec(input); + let union_projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union_projection, false); + let plan = + aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + plan + } + + /// Same as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection + /// as well between the union and aggregate. This change the outcome: + /// + /// * we no longer get repartitioning, and instead get coalescing. + #[test] + fn coalesces_for_aggregate_after_sorted_union_projection() -> Result<()> { + let plan = + aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(); 2]); + + // Same as `repartitions_for_aggregate_after_sorted_union`. No error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It no longer does a repartition on the first run. + // Instead adds a SPM. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortPreservingMergeExec: [a@0 ASC]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_first_run, + plan.clone(), + true + ); + + // Then it removes the SPM, and inserts a coalesace on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "CoalescePartitionsExec", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_second_run, + plan, + false + ); + + Ok(()) + } + #[test] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index dd8e9d900b7df..270460176fb00 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -37,7 +37,7 @@ use std::sync::Arc; -use super::utils::{add_sort_above, add_sort_above_with_check}; +use super::utils::{add_sort_above, add_sort_above_with_check, is_aggregation}; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::replace_with_order_preserving_variants::{ @@ -516,7 +516,7 @@ fn remove_bottleneck_in_subplan( ) -> Result { let plan = &requirements.plan; let children = &mut requirements.children; - if is_coalesce_partitions(&children[0].plan) { + if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. let mut new_child_node = children[0].children.swap_remove(0); while new_child_node.plan.output_partitioning() == plan.output_partitioning() @@ -657,7 +657,11 @@ fn get_sort_exprs( #[cfg(test)] mod tests { use super::*; + use crate::physical_optimizer::enforce_distribution::tests::{ + parquet_exec_with_stats, projection_exec_with_alias, schema, + }; use crate::physical_optimizer::enforce_distribution::EnforceDistribution; + use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, filter_exec, global_limit_exec, hash_join_exec, @@ -675,7 +679,11 @@ mod tests { use datafusion_common::Result; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; + use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; + use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, + }; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use rstest::rstest; @@ -2183,6 +2191,94 @@ mod tests { Ok(()) } + fn single_partition_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, + ) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| { + (col(column, &input.schema()).unwrap(), alias.to_string()) + }) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) + } + + #[tokio::test] + async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc = + single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!( + get_plan_string(&optimized), + vec![ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + // Plan is valid. + let checker = SanityCheckPlan::new(); + let checker = checker.optimize(optimized, &Default::default()); + assert!(checker.is_ok()); + + Ok(()) + } + #[tokio::test] async fn test_coalesce_propagate() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index cdecc9d31862a..4f6268c1888ab 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -29,6 +29,7 @@ use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::tree_node::PlanContext; @@ -108,3 +109,8 @@ pub fn is_union(plan: &Arc) -> bool { pub fn is_repartition(plan: &Arc) -> bool { plan.as_any().is::() } + +/// Checks whether the given operator is a [`AggregateExec`]. +pub fn is_aggregation(plan: &Arc) -> bool { + plan.as_any().is::() +}