From 2ddd89fc9d0f216e9fcf2c94a1b7d79c5388889b Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Jul 2025 16:43:37 +0200 Subject: [PATCH 1/7] fix: temporary fix to handle incorrect coalesce (inserted during EnforceDistribution) which later causes an error during EnforceSort (without our patch). The next DataFusion version 46 upgrade does the proper fix, which is to not insert the coalesce in the first place. test: recreating the iox plan: * demonstrate the insertion of coalesce after the use of column estimates, and the removal of the test scenario's forcing of rr repartitioning test: reproducer of SanityCheck failure after EnforceSorting removes the coalesce added in the EnforceDistribution fix: special case to not remove the needed coalesce --- .../enforce_distribution.rs | 332 +++++++++++++++++- .../physical_optimizer/enforce_sorting.rs | 99 +++++- .../tests/physical_optimizer/test_utils.rs | 7 + .../src/enforce_sorting/mod.rs | 6 +- datafusion/physical-optimizer/src/utils.rs | 6 + 5 files changed, 432 insertions(+), 18 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index e0826c90dd8d2..fb4b3ba0312f6 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -23,7 +23,7 @@ use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec, schema, sort_exec, sort_exec_with_preserve_partitioning, sort_merge_join_exec, - sort_preserving_merge_exec, union_exec, + sort_preserving_merge_exec, trim_plan_display, union_exec, }; use arrow::array::{RecordBatch, UInt64Array, UInt8Array}; @@ -39,10 +39,12 @@ use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::ScalarValue; +use datafusion_common::{assert_contains, ScalarValue}; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_expr::{JoinType, Operator}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::{AggregateUDF, JoinType, Operator}; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{binary, lit, BinaryExpr, Column, Literal}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ @@ -51,6 +53,7 @@ use datafusion_physical_expr_common::sort_expr::{ use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_optimizer::sanity_checker::check_plan_sanity; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -66,7 +69,7 @@ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ - get_plan_string, DisplayAs, DisplayFormatType, ExecutionPlanProperties, + displayable, get_plan_string, DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics, }; @@ -162,8 +165,8 @@ impl ExecutionPlan for SortRequiredExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -237,7 +240,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { @@ -254,6 +257,15 @@ fn projection_exec_with_alias( fn aggregate_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, +) -> Arc { + aggregate_exec_with_aggr_expr_and_alias(input, vec![], alias_pairs) +} + +#[expect(clippy::type_complexity)] +fn aggregate_exec_with_aggr_expr_and_alias( + input: Arc, + aggr_expr: Vec<(Arc, Vec>)>, + alias_pairs: Vec<(String, String)>, ) -> Arc { let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; @@ -274,18 +286,31 @@ fn aggregate_exec_with_alias( .collect::>(); let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); + let aggr_expr = aggr_expr + .into_iter() + .map(|(udaf, exprs)| { + AggregateExprBuilder::new(udaf.clone(), exprs) + .alias(udaf.name()) + .schema(Arc::clone(&schema)) + .build() + .map(Arc::new) + .unwrap() + }) + .collect::>(); + let filter_exprs = std::iter::repeat_n(None, aggr_expr.len()).collect::>(); + Arc::new( AggregateExec::try_new( AggregateMode::FinalPartitioned, final_grouping, - vec![], - vec![], + aggr_expr.clone(), + filter_exprs.clone(), Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, - vec![], - vec![], + aggr_expr, + filter_exprs, input, schema.clone(), ) @@ -442,6 +467,12 @@ impl TestConfig { self } + /// Set batch size. + fn with_batch_size(mut self, batch_size: usize) -> Self { + self.config.execution.batch_size = batch_size; + self + } + /// Perform a series of runs using the current [`TestConfig`], /// assert the expected plan result, /// and return the result plan (for potential subsequent runs). @@ -2030,6 +2061,285 @@ fn repartition_ignores_union() -> Result<()> { Ok(()) } +fn aggregate_over_union(input: Vec>) -> Arc { + let union = union_exec(input); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +// Aggregate over a union, +// with current testing setup. +// +// It will repartiton twice for an aggregate over a union. +// * repartitions before the partial aggregate. +// * repartitions before the final aggregation. +#[test] +fn repartitions_twice_for_aggregate_after_union() -> Result<()> { + let plan = aggregate_over_union(vec![parquet_exec(); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert_contains!( + err.message(), + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)" + ); + + // Updated plan (post optimization) will have added RepartitionExecs (btwn union and aggregation). + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default(); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + +// Aggregate over a union, +// but make the test setup more realistic. +// +// It will repartiton once for an aggregate over a union. +// * repartitions btwn partial & final aggregations. +#[test] +fn repartitions_once_for_aggregate_after_union() -> Result<()> { + // use parquet exec with stats + let plan: Arc = + aggregate_over_union(vec![parquet_exec_with_stats(10000); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert_contains!( + err.message(), + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)" + ); + + // This removes the forced round-robin repartitioning, + // by no longer hard-coding batch_size=1. + // + // Updated plan (post optimization) will have added only 1 RepartitionExec. + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default().with_batch_size(100); + test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; + test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + +/// Same as [`aggregate_over_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]) + .unwrap(); + let sort = sort_exec(sort_key, union); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same as [`repartitions_once_for_aggregate_after_union`], but adds a sort btwn +/// the union and the aggregate. This changes the outcome: +/// +/// * we no longer get a distribution error. +/// * but we still get repartitioning? +#[test] +fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { + let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(10000); 2]); + + // With the sort, there is no distribution error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It does not repartition on the first run + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " UnionExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default().with_batch_size(100); + test_config.run( + expected_after_first_run, + plan.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + + // But does repartition on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + " UnionExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected_after_second_run, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + +/// Same as [`aggregate_over_sorted_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union_projection( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let union_projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]) + .unwrap(); + let sort = sort_exec(sort_key, union_projection); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + plan +} + +/// Same as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection +/// as well between the union and aggregate. This change the outcome: +/// +/// * we no longer get repartitioning, and instead get coalescing. +#[test] +fn coalesces_for_aggregate_after_sorted_union_projection() -> Result<()> { + let plan = + aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(10000); 2]); + + // Same as `repartitions_for_aggregate_after_sorted_union`. No error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It no longer does a repartition on the first run. + // Instead adds a SPM. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + let test_config = TestConfig::default().with_batch_size(100); + test_config.run( + expected_after_first_run, + plan.clone(), + &DISTRIB_DISTRIB_SORT, + )?; + + // Then it removes the SPM, and inserts a coalesace on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " CoalescePartitionsExec", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + test_config.run(expected_after_second_run, plan, &SORT_DISTRIB_DISTRIB)?; + + Ok(()) +} + #[test] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index a19dd7ace9773..947cbd3983923 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -18,15 +18,16 @@ use std::sync::Arc; use crate::memory_limit::DummyStreamPartition; +use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, - projection_exec, repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, - sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, - sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, - union_exec, RequirementsTestExec, + parquet_exec_with_stats, projection_exec, repartition_exec, schema, sort_exec, + sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec, + sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, + spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, }; use arrow::compute::SortOptions; @@ -48,6 +49,9 @@ use datafusion_physical_expr_common::sort_expr::{ }; use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, NotExpr}; +use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; +use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; +use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -2302,6 +2306,93 @@ async fn test_commutativity() -> Result<()> { Ok(()) } +fn single_partition_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, +) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(10000); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]) + .unwrap(); + let plan: Arc = + single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!( + get_plan_string(&optimized), + vec![ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ], + ); + + // Plan is valid. + let checker = SanityCheckPlan::new(); + let checker = checker.optimize(optimized, &Default::default()); + assert!(checker.is_ok()); + + Ok(()) +} + #[tokio::test] async fn test_coalesce_propagate() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 69dbe04927b2d..144c51d5260fc 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -515,6 +515,13 @@ pub fn check_integrity(context: PlanContext) -> Result Vec<&str> { + plan.split('\n') + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .collect() +} + // construct a stream partition for test purposes #[derive(Debug)] pub struct TestStreamPartition { diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 8a71b28486a2a..dae0edcfb1716 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -48,8 +48,8 @@ use crate::enforce_sorting::sort_pushdown::{ }; use crate::output_requirements::OutputRequirementExec; use crate::utils::{ - add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, - is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, + add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions, + is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, }; use crate::PhysicalOptimizerRule; @@ -678,7 +678,7 @@ fn remove_bottleneck_in_subplan( ) -> Result { let plan = &requirements.plan; let children = &mut requirements.children; - if is_coalesce_partitions(&children[0].plan) { + if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. let mut new_child_node = children[0].children.swap_remove(0); while new_child_node.plan.output_partitioning() == plan.output_partitioning() diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 3655e555a7440..d3207d4880a70 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use datafusion_common::Result; use datafusion_physical_expr::{LexOrdering, LexRequirement}; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -113,3 +114,8 @@ pub fn is_repartition(plan: &Arc) -> bool { pub fn is_limit(plan: &Arc) -> bool { plan.as_any().is::() || plan.as_any().is::() } + +/// Checks whether the given operator is a [`AggregateExec`]. +pub fn is_aggregation(plan: &Arc) -> bool { + plan.as_any().is::() +} From 8e58fbd418619ef3bcdd8b16937ad67aa6214f88 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 24 Jun 2025 14:54:00 +0200 Subject: [PATCH 2/7] chore: skip order calculation / exponential planning --- .../src/equivalence/properties/union.rs | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties/union.rs b/datafusion/physical-expr/src/equivalence/properties/union.rs index efbefd0d39bfb..8ec2464068efe 100644 --- a/datafusion/physical-expr/src/equivalence/properties/union.rs +++ b/datafusion/physical-expr/src/equivalence/properties/union.rs @@ -67,16 +67,43 @@ fn calculate_union_binary( }) .collect::>(); + // TEMP HACK WORKAROUND + // Revert code from https://github.com/apache/datafusion/pull/12562 + // Context: https://github.com/apache/datafusion/issues/13748 + // Context: https://github.com/influxdata/influxdb_iox/issues/13038 + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(&lhs, &rhs)?; - orderings.add_satisfied_orderings(&rhs, &lhs)?; - let orderings = orderings.build(); + let mut orderings = vec![]; + for ordering in lhs.normalized_oeq_class().into_iter() { + let mut ordering: Vec = ordering.into(); + + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(ordering.clone())? { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for ordering in rhs.normalized_oeq_class().into_iter() { + let mut ordering: Vec = ordering.into(); + + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(ordering.clone())? { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } let mut eq_properties = EquivalenceProperties::new(lhs.schema); eq_properties.add_constants(constants)?; eq_properties.add_orderings(orderings); + Ok(eq_properties) } @@ -122,6 +149,7 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } +#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -504,6 +532,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -579,6 +608,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); @@ -607,6 +637,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -658,6 +689,7 @@ mod tests { } #[test] + #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) From 200feea0285ef301e192605a64912d421dbd8887 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 16 Jul 2024 12:14:19 -0400 Subject: [PATCH 3/7] (New) Test + workaround for SanityCheck plan --- datafusion/physical-optimizer/src/sanity_checker.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index acc70d39f057b..3cc5319f9e108 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; @@ -135,6 +137,14 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { let sort_req = sort_req.into_single(); From dfcf83315d3387ef591c368aa6ab6ce5a0a25b5d Mon Sep 17 00:00:00 2001 From: Adam Curtis Date: Tue, 21 Oct 2025 13:33:16 -0400 Subject: [PATCH 4/7] chore: add debug logging and skip error on physical schema check --- datafusion/core/src/physical_planner.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d645dd1d0e55f..3dcea868e30be 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -710,10 +710,15 @@ impl DefaultPhysicalPlanner { differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable())); } } - return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences - .iter() - .map(|s| format!("\n\t- {s}")) - .join("")); + + log::debug!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent()); + + //influx: temporarily remove error and only log so that we can find a + //reproducer in production + // return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences + // .iter() + // .map(|s| format!("\n\t- {s}")) + // .join("")); } let groups = self.create_grouping_physical_expr( From 9302b5a3fe410da7c8f5b365a4d4900290727570 Mon Sep 17 00:00:00 2001 From: Denise Wiedl Date: Thu, 18 Sep 2025 23:07:21 +0300 Subject: [PATCH 5/7] Keep aggregate udaf schema names unique when missing an order-by * test: reproducer of bug * fix: make schema names unique for approx_percentile_cont * test: regression test is now resolved --- datafusion/expr/src/udaf.rs | 2 +- .../sqllogictest/test_files/aggregate.slt | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 9619d60a24ac7..8a658b6654635 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -458,7 +458,7 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync { // exclude the first function argument(= column) in ordered set aggregate function, // because it is duplicated with the WITHIN GROUP clause in schema name. - let args = if self.is_ordered_set_aggregate() { + let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() { &args[1..] } else { &args[..] diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index eed3721078c72..73541ecfb9e9a 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1821,6 +1821,29 @@ c 122 d 124 e 115 + +# using approx_percentile_cont on 2 columns with same signature +query TII +SELECT c1, approx_percentile_cont(c2, 0.95) AS c2, approx_percentile_cont(c3, 0.95) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 +---- +a 5 73 +b 5 68 +c 5 122 +d 5 124 +e 5 115 + +# error is unique to this UDAF +query TRR +SELECT c1, avg(c2) AS c2, avg(c3) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 +---- +a 2.857142857143 -18.333333333333 +b 3.263157894737 -5.842105263158 +c 2.666666666667 -1.333333333333 +d 2.444444444444 25.444444444444 +e 3 40.333333333333 + + + query TI SELECT c1, approx_percentile_cont(0.95) WITHIN GROUP (ORDER BY c3 DESC) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 ---- From da0cdf201937bb3337ef2b075df520bb074d355e Mon Sep 17 00:00:00 2001 From: Jeffrey Vo Date: Sat, 15 Nov 2025 13:41:37 +1100 Subject: [PATCH 6/7] CI: try free up space in `Rust / cargo test (amd64)` action (#18709) Closes #18692 (hopefully) Trying to get CI to pass consistently, try various techniques. --- .github/workflows/rust.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 7bfdcd8a6e74a..a454b375332a8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -266,7 +266,21 @@ jobs: runs-on: ubuntu-latest container: image: amd64/rust + volumes: + - /usr/local:/host/usr/local steps: + - name: Remove unnecessary preinstalled software + run: | + echo "Disk space before cleanup:" + df -h + # remove tool cache: about 8.5GB (github has host /opt/hostedtoolcache mounted as /__t) + rm -rf /__t/* || true + # remove Haskell runtime: about 6.3GB (host /usr/local/.ghcup) + rm -rf /host/usr/local/.ghcup || true + # remove Android library: about 7.8GB (host /usr/local/lib/android) + rm -rf /host/usr/local/lib/android || true + echo "Disk space after cleanup:" + df -h - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: submodules: true From c594f06109974a874a30f27c34a28030ecfa5b47 Mon Sep 17 00:00:00 2001 From: Adam Curtis Date: Wed, 3 Dec 2025 12:54:36 -0500 Subject: [PATCH 7/7] Revert "Patched DF 50.3.0 (revision c)" --- .github/workflows/rust.yml | 14 - datafusion/core/src/physical_planner.rs | 13 +- .../enforce_distribution.rs | 332 +----------------- .../physical_optimizer/enforce_sorting.rs | 99 +----- .../tests/physical_optimizer/test_utils.rs | 7 - datafusion/expr/src/udaf.rs | 2 +- .../src/equivalence/properties/union.rs | 40 +-- .../src/enforce_sorting/mod.rs | 6 +- .../physical-optimizer/src/sanity_checker.rs | 10 - datafusion/physical-optimizer/src/utils.rs | 6 - .../sqllogictest/test_files/aggregate.slt | 23 -- 11 files changed, 27 insertions(+), 525 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a454b375332a8..7bfdcd8a6e74a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -266,21 +266,7 @@ jobs: runs-on: ubuntu-latest container: image: amd64/rust - volumes: - - /usr/local:/host/usr/local steps: - - name: Remove unnecessary preinstalled software - run: | - echo "Disk space before cleanup:" - df -h - # remove tool cache: about 8.5GB (github has host /opt/hostedtoolcache mounted as /__t) - rm -rf /__t/* || true - # remove Haskell runtime: about 6.3GB (host /usr/local/.ghcup) - rm -rf /host/usr/local/.ghcup || true - # remove Android library: about 7.8GB (host /usr/local/lib/android) - rm -rf /host/usr/local/lib/android || true - echo "Disk space after cleanup:" - df -h - uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 with: submodules: true diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3dcea868e30be..d645dd1d0e55f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -710,15 +710,10 @@ impl DefaultPhysicalPlanner { differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable())); } } - - log::debug!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent()); - - //influx: temporarily remove error and only log so that we can find a - //reproducer in production - // return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences - // .iter() - // .map(|s| format!("\n\t- {s}")) - // .join("")); + return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences + .iter() + .map(|s| format!("\n\t- {s}")) + .join("")); } let groups = self.create_grouping_physical_expr( diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index fb4b3ba0312f6..e0826c90dd8d2 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -23,7 +23,7 @@ use crate::physical_optimizer::test_utils::{ check_integrity, coalesce_partitions_exec, parquet_exec_with_sort, parquet_exec_with_stats, repartition_exec, schema, sort_exec, sort_exec_with_preserve_partitioning, sort_merge_join_exec, - sort_preserving_merge_exec, trim_plan_display, union_exec, + sort_preserving_merge_exec, union_exec, }; use arrow::array::{RecordBatch, UInt64Array, UInt8Array}; @@ -39,12 +39,10 @@ use datafusion::datasource::MemTable; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::error::Result; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{assert_contains, ScalarValue}; +use datafusion_common::ScalarValue; use datafusion_datasource::file_groups::FileGroup; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_expr::{AggregateUDF, JoinType, Operator}; -use datafusion_physical_expr::aggregate::AggregateExprBuilder; +use datafusion_expr::{JoinType, Operator}; use datafusion_physical_expr::expressions::{binary, lit, BinaryExpr, Column, Literal}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{ @@ -53,7 +51,6 @@ use datafusion_physical_expr_common::sort_expr::{ use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; -use datafusion_physical_optimizer::sanity_checker::check_plan_sanity; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -69,7 +66,7 @@ use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{ - displayable, get_plan_string, DisplayAs, DisplayFormatType, ExecutionPlanProperties, + get_plan_string, DisplayAs, DisplayFormatType, ExecutionPlanProperties, PlanProperties, Statistics, }; @@ -165,8 +162,8 @@ impl ExecutionPlan for SortRequiredExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -240,7 +237,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { @@ -257,15 +254,6 @@ pub(crate) fn projection_exec_with_alias( fn aggregate_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, -) -> Arc { - aggregate_exec_with_aggr_expr_and_alias(input, vec![], alias_pairs) -} - -#[expect(clippy::type_complexity)] -fn aggregate_exec_with_aggr_expr_and_alias( - input: Arc, - aggr_expr: Vec<(Arc, Vec>)>, - alias_pairs: Vec<(String, String)>, ) -> Arc { let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; @@ -286,31 +274,18 @@ fn aggregate_exec_with_aggr_expr_and_alias( .collect::>(); let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); - let aggr_expr = aggr_expr - .into_iter() - .map(|(udaf, exprs)| { - AggregateExprBuilder::new(udaf.clone(), exprs) - .alias(udaf.name()) - .schema(Arc::clone(&schema)) - .build() - .map(Arc::new) - .unwrap() - }) - .collect::>(); - let filter_exprs = std::iter::repeat_n(None, aggr_expr.len()).collect::>(); - Arc::new( AggregateExec::try_new( AggregateMode::FinalPartitioned, final_grouping, - aggr_expr.clone(), - filter_exprs.clone(), + vec![], + vec![], Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, - aggr_expr, - filter_exprs, + vec![], + vec![], input, schema.clone(), ) @@ -467,12 +442,6 @@ impl TestConfig { self } - /// Set batch size. - fn with_batch_size(mut self, batch_size: usize) -> Self { - self.config.execution.batch_size = batch_size; - self - } - /// Perform a series of runs using the current [`TestConfig`], /// assert the expected plan result, /// and return the result plan (for potential subsequent runs). @@ -2061,285 +2030,6 @@ fn repartition_ignores_union() -> Result<()> { Ok(()) } -fn aggregate_over_union(input: Vec>) -> Arc { - let union = union_exec(input); - let plan = - aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); - - // Demonstrate starting plan. - let before = displayable(plan.as_ref()).indent(true).to_string(); - let before = trim_plan_display(&before); - assert_eq!( - before, - vec![ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - - plan -} - -// Aggregate over a union, -// with current testing setup. -// -// It will repartiton twice for an aggregate over a union. -// * repartitions before the partial aggregate. -// * repartitions before the final aggregation. -#[test] -fn repartitions_twice_for_aggregate_after_union() -> Result<()> { - let plan = aggregate_over_union(vec![parquet_exec(); 2]); - - // We get a distribution error without repartitioning. - let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); - assert_contains!( - err.message(), - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)" - ); - - // Updated plan (post optimization) will have added RepartitionExecs (btwn union and aggregation). - let expected = &[ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ]; - let test_config = TestConfig::default(); - test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; - - Ok(()) -} - -// Aggregate over a union, -// but make the test setup more realistic. -// -// It will repartiton once for an aggregate over a union. -// * repartitions btwn partial & final aggregations. -#[test] -fn repartitions_once_for_aggregate_after_union() -> Result<()> { - // use parquet exec with stats - let plan: Arc = - aggregate_over_union(vec![parquet_exec_with_stats(10000); 2]); - - // We get a distribution error without repartitioning. - let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); - assert_contains!( - err.message(), - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)" - ); - - // This removes the forced round-robin repartitioning, - // by no longer hard-coding batch_size=1. - // - // Updated plan (post optimization) will have added only 1 RepartitionExec. - let expected = &[ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", - " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ]; - let test_config = TestConfig::default().with_batch_size(100); - test_config.run(expected, plan.clone(), &DISTRIB_DISTRIB_SORT)?; - test_config.run(expected, plan, &SORT_DISTRIB_DISTRIB)?; - - Ok(()) -} - -/// Same as [`aggregate_over_union`], but with a sort btwn the union and aggregation. -fn aggregate_over_sorted_union( - input: Vec>, -) -> Arc { - let union = union_exec(input); - let schema = schema(); - let sort_key = LexOrdering::new(vec![PhysicalSortExpr { - expr: col("a", &schema).unwrap(), - options: SortOptions::default(), - }]) - .unwrap(); - let sort = sort_exec(sort_key, union); - let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); - - // Demonstrate starting plan. - // Notice the `ordering_mode=Sorted` on the aggregations. - let before = displayable(plan.as_ref()).indent(true).to_string(); - let before = trim_plan_display(&before); - assert_eq!( - before, - vec![ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - - plan -} - -/// Same as [`repartitions_once_for_aggregate_after_union`], but adds a sort btwn -/// the union and the aggregate. This changes the outcome: -/// -/// * we no longer get a distribution error. -/// * but we still get repartitioning? -#[test] -fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { - let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(10000); 2]); - - // With the sort, there is no distribution error. - let checker = check_plan_sanity(plan.clone(), &Default::default()); - assert!(checker.is_ok()); - - // It does not repartition on the first run - let expected_after_first_run = &[ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", - " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " UnionExec", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ]; - let test_config = TestConfig::default().with_batch_size(100); - test_config.run( - expected_after_first_run, - plan.clone(), - &DISTRIB_DISTRIB_SORT, - )?; - - // But does repartition on the second run. - let expected_after_second_run = &[ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", - " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", - " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " UnionExec", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ]; - test_config.run(expected_after_second_run, plan, &SORT_DISTRIB_DISTRIB)?; - - Ok(()) -} - -/// Same as [`aggregate_over_sorted_union`], but with a sort btwn the union and aggregation. -fn aggregate_over_sorted_union_projection( - input: Vec>, -) -> Arc { - let union = union_exec(input); - let union_projection = projection_exec_with_alias( - union, - vec![ - ("a".to_string(), "a".to_string()), - ("b".to_string(), "value".to_string()), - ], - ); - let schema = schema(); - let sort_key = LexOrdering::new(vec![PhysicalSortExpr { - expr: col("a", &schema).unwrap(), - options: SortOptions::default(), - }]) - .unwrap(); - let sort = sort_exec(sort_key, union_projection); - let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); - - // Demonstrate starting plan. - // Notice the `ordering_mode=Sorted` on the aggregations. - let before = displayable(plan.as_ref()).indent(true).to_string(); - let before = trim_plan_display(&before); - assert_eq!( - before, - vec![ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", - "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - "ProjectionExec: expr=[a@0 as a, b@1 as value]", - "UnionExec", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - - plan -} - -/// Same as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection -/// as well between the union and aggregate. This change the outcome: -/// -/// * we no longer get repartitioning, and instead get coalescing. -#[test] -fn coalesces_for_aggregate_after_sorted_union_projection() -> Result<()> { - let plan = - aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(10000); 2]); - - // Same as `repartitions_for_aggregate_after_sorted_union`. No error. - let checker = check_plan_sanity(plan.clone(), &Default::default()); - assert!(checker.is_ok()); - - // It no longer does a repartition on the first run. - // Instead adds a SPM. - let expected_after_first_run = &[ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", - " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " ProjectionExec: expr=[a@0 as a, b@1 as value]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ]; - let test_config = TestConfig::default().with_batch_size(100); - test_config.run( - expected_after_first_run, - plan.clone(), - &DISTRIB_DISTRIB_SORT, - )?; - - // Then it removes the SPM, and inserts a coalesace on the second run. - let expected_after_second_run = &[ - "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", - " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", - " RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", - " SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", - " AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", - " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " CoalescePartitionsExec", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " ProjectionExec: expr=[a@0 as a, b@1 as value]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ]; - test_config.run(expected_after_second_run, plan, &SORT_DISTRIB_DISTRIB)?; - - Ok(()) -} - #[test] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 947cbd3983923..a19dd7ace9773 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -18,16 +18,15 @@ use std::sync::Arc; use crate::memory_limit::DummyStreamPartition; -use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_sort, - parquet_exec_with_stats, projection_exec, repartition_exec, schema, sort_exec, - sort_exec_with_fetch, sort_expr, sort_expr_options, sort_merge_join_exec, - sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, - spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, + projection_exec, repartition_exec, sort_exec, sort_exec_with_fetch, sort_expr, + sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, + sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, + union_exec, RequirementsTestExec, }; use arrow::compute::SortOptions; @@ -49,9 +48,6 @@ use datafusion_physical_expr_common::sort_expr::{ }; use datafusion_physical_expr::{Distribution, Partitioning}; use datafusion_physical_expr::expressions::{col, BinaryExpr, Column, NotExpr}; -use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; -use datafusion_physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; -use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -2306,93 +2302,6 @@ async fn test_commutativity() -> Result<()> { Ok(()) } -fn single_partition_aggregate( - input: Arc, - alias_pairs: Vec<(String, String)>, -) -> Arc { - let schema = schema(); - let group_by = alias_pairs - .iter() - .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) - .collect::>(); - let group_by = PhysicalGroupBy::new_single(group_by); - - Arc::new( - AggregateExec::try_new( - AggregateMode::SinglePartitioned, - group_by, - vec![], - vec![], - input, - schema, - ) - .unwrap(), - ) -} - -#[tokio::test] -async fn test_preserve_needed_coalesce() -> Result<()> { - // Input to EnforceSorting, from our test case. - let plan = projection_exec_with_alias( - union_exec(vec![parquet_exec_with_stats(10000); 2]), - vec![ - ("a".to_string(), "a".to_string()), - ("b".to_string(), "value".to_string()), - ], - ); - let plan = Arc::new(CoalescePartitionsExec::new(plan)); - let schema = schema(); - let sort_key = LexOrdering::new(vec![PhysicalSortExpr { - expr: col("a", &schema).unwrap(), - options: SortOptions::default(), - }]) - .unwrap(); - let plan: Arc = - single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); - let plan = sort_exec(sort_key, plan); - - // Starting plan: as in our test case. - assert_eq!( - get_plan_string(&plan), - vec![ - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", - " CoalescePartitionsExec", - " ProjectionExec: expr=[a@0 as a, b@1 as value]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - - let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default()); - assert!(checker.is_ok()); - - // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). - let optimizer = EnforceSorting::new(); - let optimized = optimizer.optimize(plan, &Default::default())?; - assert_eq!( - get_plan_string(&optimized), - vec![ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", - " CoalescePartitionsExec", - " ProjectionExec: expr=[a@0 as a, b@1 as value]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - - // Plan is valid. - let checker = SanityCheckPlan::new(); - let checker = checker.optimize(optimized, &Default::default()); - assert!(checker.is_ok()); - - Ok(()) -} - #[tokio::test] async fn test_coalesce_propagate() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 144c51d5260fc..69dbe04927b2d 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -515,13 +515,6 @@ pub fn check_integrity(context: PlanContext) -> Result Vec<&str> { - plan.split('\n') - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .collect() -} - // construct a stream partition for test purposes #[derive(Debug)] pub struct TestStreamPartition { diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 8a658b6654635..9619d60a24ac7 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -458,7 +458,7 @@ pub trait AggregateUDFImpl: Debug + DynEq + DynHash + Send + Sync { // exclude the first function argument(= column) in ordered set aggregate function, // because it is duplicated with the WITHIN GROUP clause in schema name. - let args = if self.is_ordered_set_aggregate() && !order_by.is_empty() { + let args = if self.is_ordered_set_aggregate() { &args[1..] } else { &args[..] diff --git a/datafusion/physical-expr/src/equivalence/properties/union.rs b/datafusion/physical-expr/src/equivalence/properties/union.rs index 8ec2464068efe..efbefd0d39bfb 100644 --- a/datafusion/physical-expr/src/equivalence/properties/union.rs +++ b/datafusion/physical-expr/src/equivalence/properties/union.rs @@ -67,43 +67,16 @@ fn calculate_union_binary( }) .collect::>(); - // TEMP HACK WORKAROUND - // Revert code from https://github.com/apache/datafusion/pull/12562 - // Context: https://github.com/apache/datafusion/issues/13748 - // Context: https://github.com/influxdata/influxdb_iox/issues/13038 - // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = vec![]; - for ordering in lhs.normalized_oeq_class().into_iter() { - let mut ordering: Vec = ordering.into(); - - // Progressively shorten the ordering to search for a satisfied prefix: - while !rhs.ordering_satisfy(ordering.clone())? { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } + let mut orderings = UnionEquivalentOrderingBuilder::new(); + orderings.add_satisfied_orderings(&lhs, &rhs)?; + orderings.add_satisfied_orderings(&rhs, &lhs)?; + let orderings = orderings.build(); - for ordering in rhs.normalized_oeq_class().into_iter() { - let mut ordering: Vec = ordering.into(); - - // Progressively shorten the ordering to search for a satisfied prefix: - while !lhs.ordering_satisfy(ordering.clone())? { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } let mut eq_properties = EquivalenceProperties::new(lhs.schema); eq_properties.add_constants(constants)?; eq_properties.add_orderings(orderings); - Ok(eq_properties) } @@ -149,7 +122,6 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } -#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -532,7 +504,6 @@ mod tests { } #[test] - #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -608,7 +579,6 @@ mod tests { } #[test] - #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); @@ -637,7 +607,6 @@ mod tests { } #[test] - #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_gap_fill_symmetric() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) @@ -689,7 +658,6 @@ mod tests { } #[test] - #[ignore = "InfluxData patch: chore: skip order calculation / exponential planning"] fn test_union_equivalence_properties_constants_middle_desc() -> Result<()> { let schema = create_test_schema().unwrap(); UnionEquivalenceTest::new(&schema) diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index dae0edcfb1716..8a71b28486a2a 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -48,8 +48,8 @@ use crate::enforce_sorting::sort_pushdown::{ }; use crate::output_requirements::OutputRequirementExec; use crate::utils::{ - add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions, - is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, + add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, + is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, }; use crate::PhysicalOptimizerRule; @@ -678,7 +678,7 @@ fn remove_bottleneck_in_subplan( ) -> Result { let plan = &requirements.plan; let children = &mut requirements.children; - if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) { + if is_coalesce_partitions(&children[0].plan) { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. let mut new_child_node = children[0].children.swap_remove(0); while new_child_node.plan.output_partitioning() == plan.output_partitioning() diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 3cc5319f9e108..acc70d39f057b 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,8 +32,6 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; -use datafusion_physical_plan::sorts::sort::SortExec; -use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; @@ -137,14 +135,6 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { - // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 - if child.as_any().downcast_ref::().is_some() { - continue; - } - if child.as_any().downcast_ref::().is_some() { - continue; - } - let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { let sort_req = sort_req.into_single(); diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index d3207d4880a70..3655e555a7440 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use datafusion_common::Result; use datafusion_physical_expr::{LexOrdering, LexRequirement}; -use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -114,8 +113,3 @@ pub fn is_repartition(plan: &Arc) -> bool { pub fn is_limit(plan: &Arc) -> bool { plan.as_any().is::() || plan.as_any().is::() } - -/// Checks whether the given operator is a [`AggregateExec`]. -pub fn is_aggregation(plan: &Arc) -> bool { - plan.as_any().is::() -} diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 73541ecfb9e9a..eed3721078c72 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1821,29 +1821,6 @@ c 122 d 124 e 115 - -# using approx_percentile_cont on 2 columns with same signature -query TII -SELECT c1, approx_percentile_cont(c2, 0.95) AS c2, approx_percentile_cont(c3, 0.95) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 ----- -a 5 73 -b 5 68 -c 5 122 -d 5 124 -e 5 115 - -# error is unique to this UDAF -query TRR -SELECT c1, avg(c2) AS c2, avg(c3) AS c3 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 ----- -a 2.857142857143 -18.333333333333 -b 3.263157894737 -5.842105263158 -c 2.666666666667 -1.333333333333 -d 2.444444444444 25.444444444444 -e 3 40.333333333333 - - - query TI SELECT c1, approx_percentile_cont(0.95) WITHIN GROUP (ORDER BY c3 DESC) AS c3_p95 FROM aggregate_test_100 GROUP BY 1 ORDER BY 1 ----