diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 50c67f09c7045..bd78872d1b0ce 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 3412b962d8598..fb8816826088c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,13 +17,16 @@ use std::sync::Arc; +use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias; +use crate::physical_optimizer::sanity_checker::assert_sanity_check; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic, bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, create_test_schema4, filter_exec, global_limit_exec, hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec, - repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, + parquet_exec_with_stats, repartition_exec, schema, single_partitioned_aggregate, + sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, }; @@ -2280,3 +2283,49 @@ async fn test_not_replaced_with_partial_sort_for_unbounded_input() -> Result<()> assert_optimized!(expected_input, expected_no_change, physical_plan, true); Ok(()) } + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc = + single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + let starting_plan = vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", + ]; + assert_eq!(get_plan_string(&plan), starting_plan); + + // Test: plan is valid. + assert_sanity_check(&plan, true); + + // EnforceSorting will not remove the coalesce, as it's required. + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!(get_plan_string(&optimized), starting_plan); + + // Test: plan is valid. + assert_sanity_check(&optimized, true); + + Ok(()) +} diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index a73d084a081f3..8564eb80e86ae 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef { } /// Check if sanity checker should accept or reject plans. -fn assert_sanity_check(plan: &Arc, is_sane: bool) { +pub(crate) fn assert_sanity_check(plan: &Arc, is_sane: bool) { let sanity_checker = SanityCheckPlan::new(); let opts = ConfigOptions::default(); assert_eq!( diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 162f93facc90d..2c45242e7657a 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -28,9 +28,10 @@ use arrow::record_batch::RecordBatch; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource}; use datafusion_common::config::ConfigOptions; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; -use datafusion_common::{JoinType, Result}; +use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics}; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{WindowFrame, WindowFunctionDefinition}; @@ -102,6 +103,44 @@ pub fn schema() -> SchemaRef { ])) } +fn int64_stats() -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Absent, + sum_value: Precision::Absent, + max_value: Precision::Exact(1_000_000.into()), + min_value: Precision::Exact(0.into()), + distinct_count: Precision::Absent, + } +} + +fn column_stats() -> Vec { + vec![ + int64_stats(), // a + int64_stats(), // b + int64_stats(), // c + ColumnStatistics::default(), + ColumnStatistics::default(), + ] +} + +/// Create parquet datasource exec using schema from [`schema`]. +pub(crate) fn parquet_exec_with_stats() -> Arc { + let mut statistics = Statistics::new_unknown(&schema()); + statistics.num_rows = Precision::Inexact(10); + statistics.column_statistics = column_stats(); + + let config = FileScanConfig::new( + ObjectStoreUrl::parse("test:///").unwrap(), + schema(), + Arc::new(ParquetSource::new(Default::default())), + ) + .with_file(PartitionedFile::new("x".to_string(), 10000)) + .with_statistics(statistics); + assert_eq!(config.statistics.num_rows, Precision::Inexact(10)); + + config.build() +} + pub fn create_test_schema() -> Result { let nullable_column = Field::new("nullable_col", DataType::Int32, true); let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false); @@ -575,6 +614,30 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec) -> Physica PhysicalGroupBy::new_single(group_by_expr.clone()) } +pub(crate) fn single_partitioned_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, +) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + pub fn assert_plan_matches_expected( plan: &Arc, expected: &[&str], diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 420c080f09c28..cf1559bc8a391 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -47,8 +47,8 @@ use crate::enforce_sorting::sort_pushdown::{ assign_initial_requirements, pushdown_sorts, SortPushDown, }; use crate::utils::{ - add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, - is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, + add_sort_above, add_sort_above_with_check, is_aggregate, is_coalesce_partitions, + is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, }; use crate::PhysicalOptimizerRule; @@ -56,7 +56,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; -use datafusion_physical_expr::{Distribution, Partitioning}; +use datafusion_physical_expr::Distribution; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -138,29 +138,65 @@ fn update_sort_ctx_children_data( /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. The data /// attribute stores whether the plan is a `CoalescePartitionsExec` or is /// connected to a `CoalescePartitionsExec` via its children. +/// +/// The tracker halts at each [`SortExec`] (where the SPM will act to replace the coalesce). +/// +/// This requires a bottom-up traversal was previously performed, updating the +/// children previously. pub type PlanWithCorrespondingCoalescePartitions = PlanContext; +/// Determines if the coalesce may be safely removed. +fn is_coalesce_to_remove( + node: &Arc, + parent: &Arc, +) -> bool { + node.as_any().downcast_ref::() + .map(|_coalesce| { + // TODO(wiedld): find a more generalized approach that does not rely on + // pattern matching the structure of the DAG + // Note that the `Partitioning::satisfy()` (parent vs. coalesce.child) cannot be used for cases of: + // * Repartition -> Coalesce -> Repartition + // * Coalesce -> AggregateExec(input=hash-partitioned) + + let parent_req_single_partition = matches!(parent.required_input_distribution()[0], Distribution::SinglePartition) + // handle aggregates with input=hashPartitioning with a single output partition + || (is_aggregate(parent) && parent.properties().output_partitioning().partition_count() <= 1); + + // node above does not require single distribution + !parent_req_single_partition + // it doesn't immediately repartition + || is_repartition(parent) + // any adjacent Coalesce->Sort can be replaced + || is_sort(parent) + }).unwrap_or(false) +} + fn update_coalesce_ctx_children( coalesce_context: &mut PlanWithCorrespondingCoalescePartitions, ) { - let children = &coalesce_context.children; - coalesce_context.data = if children.is_empty() { - // Plan has no children, it cannot be a `CoalescePartitionsExec`. - false - } else if is_coalesce_partitions(&coalesce_context.plan) { - // Initiate a connection: - true - } else { - children.iter().enumerate().any(|(idx, node)| { - // Only consider operators that don't require a single partition, - // and connected to some `CoalescePartitionsExec`: - node.data - && !matches!( - coalesce_context.plan.required_input_distribution()[idx], - Distribution::SinglePartition - ) - }) - }; + // perform lookahead(1) during bottom up traversal + // since we are checking distribution requirements after the coalesce occurs + let parent = &coalesce_context.plan; + + for child_context in coalesce_context.children.iter_mut() { + // determine if child, or it's descendents, are a coalesce to be removed + child_context.data = if child_context.children.is_empty() { + // Plan has no children, it cannot be a `CoalescePartitionsExec`. + false + } else if is_coalesce_to_remove(&child_context.plan, parent) { + // Initiate a connection: + true + } else if is_sort(&child_context.plan) { + // halt coalesce removals at the sort + false + } else { + // propagate + child_context + .children + .iter() + .any(|grandchild| grandchild.data) + }; + } } /// Performs optimizations based upon a series of subrules. @@ -316,14 +352,35 @@ fn replace_with_partial_sort( /// are transformed into /// ```text /// "SortPreservingMergeExec: \[a@0 ASC\]", -/// " ...nodes..." -/// " SortExec: expr=\[a@0 ASC\]", +/// " SortExec: expr=\[a@0 ASC\]", +/// " ...nodes..." +/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", +/// ``` +/// by following connections from [`CoalescePartitionsExec`]s to [`SortExec`]s. +/// By performing sorting in parallel, we can increase performance in some scenarios. +/// +/// This requires that there are no nodes between the [`SortExec`] and [`CoalescePartitionsExec`] +/// which require single partitioning. Do not parallelize when the following scenario occurs: +/// ```text +/// "SortExec: expr=\[a@0 ASC\]", +/// " ...nodes requiring single partitioning..." +/// " CoalescePartitionsExec", /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", /// ``` pub fn parallelize_sorts( mut requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result> { + requirements = requirements.update_plan_from_children()?; update_coalesce_ctx_children(&mut requirements); + let coalesce_can_be_removed = requirements.children.iter().any(|child| child.data); + + let should_parallelize_sort = (is_sort(&requirements.plan) + || is_sort_preserving_merge(&requirements.plan)) + && requirements.plan.output_partitioning().partition_count() <= 1 + && coalesce_can_be_removed; + + // Repartition -> Coalesce -> Repartition + let unneeded_coalesce = is_repartition(&requirements.plan) && coalesce_can_be_removed; if requirements.children.is_empty() || !requirements.children[0].data { // We only take an action when the plan is either a `SortExec`, a @@ -331,10 +388,7 @@ pub fn parallelize_sorts( // all have a single child. Therefore, if the first child has no // connection, we can return immediately. Ok(Transformed::no(requirements)) - } else if (is_sort(&requirements.plan) - || is_sort_preserving_merge(&requirements.plan)) - && requirements.plan.output_partitioning().partition_count() <= 1 - { + } else if should_parallelize_sort { // Take the initial sort expressions and requirements let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?; let sort_reqs = LexRequirement::from(sort_exprs.clone()); @@ -349,8 +403,11 @@ pub fn parallelize_sorts( // We also need to remove the self node since `remove_corresponding_coalesce_in_sub_plan` // deals with the children and their children and so on. requirements = requirements.children.swap_remove(0); + // sync the requirements.plan.children with the mutated requirements.children + requirements = requirements.update_plan_from_children()?; requirements = add_sort_above_with_check(requirements, sort_reqs, fetch); + requirements = requirements.update_plan_from_children()?; let spm = SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan)); @@ -361,20 +418,11 @@ pub fn parallelize_sorts( vec![requirements], ), )) - } else if is_coalesce_partitions(&requirements.plan) { - // There is an unnecessary `CoalescePartitionsExec` in the plan. - // This will handle the recursive `CoalescePartitionsExec` plans. + } else if unneeded_coalesce { requirements = remove_bottleneck_in_subplan(requirements)?; - // For the removal of self node which is also a `CoalescePartitionsExec`. - requirements = requirements.children.swap_remove(0); + requirements = requirements.update_plan_from_children()?; - Ok(Transformed::yes( - PlanWithCorrespondingCoalescePartitions::new( - Arc::new(CoalescePartitionsExec::new(Arc::clone(&requirements.plan))), - false, - vec![requirements], - ), - )) + Ok(Transformed::yes(requirements)) } else { Ok(Transformed::yes(requirements)) } @@ -614,19 +662,7 @@ fn remove_bottleneck_in_subplan( }) .collect::>()?; } - let mut new_reqs = requirements.update_plan_from_children()?; - if let Some(repartition) = new_reqs.plan.as_any().downcast_ref::() { - let input_partitioning = repartition.input().output_partitioning(); - // We can remove this repartitioning operator if it is now a no-op: - let mut can_remove = input_partitioning.eq(repartition.partitioning()); - // We can also remove it if we ended up with an ineffective RR: - if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() { - can_remove |= *n_out == input_partitioning.partition_count(); - } - if can_remove { - new_reqs = new_reqs.children.swap_remove(0) - } - } + let new_reqs = requirements.update_plan_from_children()?; Ok(new_reqs) } diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 636e78a06ce7b..0b91124e49b02 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -100,6 +101,11 @@ pub fn is_repartition(plan: &Arc) -> bool { plan.as_any().is::() } +/// Checks whether the given operator is an [`AggregateExec`]. +pub fn is_aggregate(plan: &Arc) -> bool { + plan.as_any().is::() +} + /// Checks whether the given operator is a limit; /// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`]. pub fn is_limit(plan: &Arc) -> bool { diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 0ba4c32ee5be8..0ab021ff92c4d 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -244,13 +244,15 @@ SELECT x, y FROM t1 UNION BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as 3 3 NULL NULL 2 2 -query III -SELECT x, y FROM t1 UNION ALL BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1; ----- -1 1 NULL -3 3 NULL -3 3 NULL -NULL 2 2 +# TODO: Test regression on a new feature, with the same SanityCheckPlan failures as currently existing for this feature. +# output plan in SanityCheckPlan error has nondeterministic order in the HashJoinExec ON, so we can't compare the error directly +# query error +# SELECT x, y FROM t1 UNION ALL BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1; +# ---- +# DataFusion error: SanityCheckPlan +# caused by +# Error during planning: Plan: ["SortPreservingMergeExec: [x@0 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false]", " ProjectionExec: expr=[x@0 as x, y@1 as y, NULL as z]", " DataSourceExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[NULL as x, y@0 as y, z@1 as z]", " CoalescePartitionsExec", " CoalesceBatchesExec: target_batch_size=8192", " HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(CAST(t2.y AS Int64)@3, Int64(2)@0), (CAST(t2.z AS Int64)@2, two@1)], projection=[y@0, z@1]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([CAST(t2.y AS Int64)@3, CAST(t2.z AS Int64)@2], 4), input_partitions=4", " ProjectionExec: expr=[y@0 as y, z@1 as z, CAST(z@1 AS Int64) as CAST(t2.z AS Int64), CAST(y@0 AS Int64) as CAST(t2.y AS Int64)]", " AggregateExec: mode=FinalPartitioned, gby=[y@0 as y, z@1 as z], aggr=[]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([y@0, z@1], 4), input_partitions=4", " RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1", " AggregateExec: mode=Partial, gby=[y@0 as y, z@1 as z], aggr=[]", " DataSourceExec: partitions=1, partition_sizes=[1]", " CoalesceBatchesExec: target_batch_size=8192", " RepartitionExec: partitioning=Hash([Int64(2)@0, two@1], 4), input_partitions=1", " ProjectionExec: expr=[2 as Int64(2), 2 as two]", " DataSourceExec: partitions=1, partition_sizes=[1]"] does not satisfy order requirements: [x@0 ASC NULLS LAST]. Child-0 order: [] + query III (SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 1) EXCEPT SELECT NULL, 2, 2 as two FROM t1 ORDER BY 1;