diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 6c2d5c93482c5..35592519a4dd7 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -339,9 +339,7 @@ mod tests { use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; - use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortRequirement, - }; + use datafusion_physical_expr::PhysicalSortRequirement; fn schema() -> SchemaRef { Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)])) @@ -1162,7 +1160,7 @@ mod tests { fn required_input_ordering(&self) -> Vec>> { vec![self .output_ordering() - .map(make_sort_requirements_from_exprs)] + .map(PhysicalSortRequirement::from_sort_exprs)] } fn with_new_children( diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 4817c01e5c413..26299e4683a87 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -50,10 +50,9 @@ use arrow::datatypes::SchemaRef; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::DataFusionError; use datafusion_physical_expr::utils::{ - make_sort_exprs_from_requirements, ordering_satisfy, - ordering_satisfy_requirement_concrete, + ordering_satisfy, ordering_satisfy_requirement_concrete, }; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement}; use itertools::{concat, izip}; use std::iter::zip; use std::sync::Arc; @@ -468,7 +467,8 @@ fn ensure_sorting( ) { // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; - let sort_expr = make_sort_exprs_from_requirements(&required_ordering); + let sort_expr = + PhysicalSortRequirement::to_sort_exprs(required_ordering); add_sort_above(child, sort_expr)?; if is_sort(child) { *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); @@ -479,7 +479,7 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a `SortExec` to the plan. - let sort_expr = make_sort_exprs_from_requirements(&required); + let sort_expr = PhysicalSortRequirement::to_sort_exprs(required); add_sort_above(child, sort_expr)?; *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); } diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 07d0002548dee..94b361cb95926 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -27,12 +27,9 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::utils::{ - make_sort_exprs_from_requirements, ordering_satisfy_requirement, - requirements_compatible, -}; -use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement, + ordering_satisfy_requirement, requirements_compatible, }; +use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; use itertools::izip; use std::ops::Deref; use std::sync::Arc; @@ -130,14 +127,15 @@ pub(crate) fn pushdown_sorts( plan.equivalence_properties() }) { // If the current plan is a SortExec, modify it to satisfy parent requirements: - let parent_required_expr = - make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?); + let parent_required_expr = PhysicalSortRequirement::to_sort_exprs( + parent_required.ok_or_else(err)?.iter().cloned(), + ); new_plan = sort_exec.input.clone(); add_sort_above(&mut new_plan, parent_required_expr)?; }; let required_ordering = new_plan .output_ordering() - .map(make_sort_requirements_from_exprs); + .map(PhysicalSortRequirement::from_sort_exprs); // Since new_plan is a SortExec, we can safely get the 0th index. let child = &new_plan.children()[0]; if let Some(adjusted) = @@ -173,8 +171,9 @@ pub(crate) fn pushdown_sorts( })) } else { // Can not push down requirements, add new SortExec: - let parent_required_expr = - make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?); + let parent_required_expr = PhysicalSortRequirement::to_sort_exprs( + parent_required.ok_or_else(err)?.iter().cloned(), + ); let mut new_plan = plan.clone(); add_sort_above(&mut new_plan, parent_required_expr)?; Ok(Transformed::Yes(SortPushDown::init(new_plan))) @@ -210,8 +209,9 @@ fn pushdown_requirement_to_children( } else if let Some(smj) = plan.as_any().downcast_ref::() { // If the current plan is SortMergeJoinExec let left_columns_len = smj.left.schema().fields().len(); - let parent_required_expr = - make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?); + let parent_required_expr = PhysicalSortRequirement::to_sort_exprs( + parent_required.ok_or_else(err)?.iter().cloned(), + ); let expr_source_side = expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len); match expr_source_side { @@ -383,15 +383,17 @@ fn shift_right_required( let new_right_required: Vec = parent_required .iter() .filter_map(|r| { - r.expr.as_any().downcast_ref::().and_then(|col| { - (col.index() >= left_columns_len).then_some(PhysicalSortRequirement { - expr: Arc::new(Column::new( - col.name(), - col.index() - left_columns_len, - )) as _, - options: r.options, - }) - }) + let Some(col) = r.expr().as_any().downcast_ref::() else { + return None; + }; + + if col.index() < left_columns_len { + return None; + } + + let new_col = + Arc::new(Column::new(col.name(), col.index() - left_columns_len)); + Some(r.clone().with_expr(new_col)) }) .collect::>(); if new_right_required.len() == parent_required.len() { diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index ace46e38b5abf..93b3a686222e3 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -35,9 +35,7 @@ use arrow::compute::{concat_batches, take, SortOptions}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortRequirement, -}; +use datafusion_physical_expr::PhysicalSortRequirement; use futures::{Stream, StreamExt}; use crate::error::DataFusionError; @@ -230,8 +228,12 @@ impl ExecutionPlan for SortMergeJoinExec { fn required_input_ordering(&self) -> Vec>> { vec![ - Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)), - Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)), + Some(PhysicalSortRequirement::from_sort_exprs( + &self.left_sort_exprs, + )), + Some(PhysicalSortRequirement::from_sort_exprs( + &self.right_sort_exprs, + )), ] } diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 98cfb24c92138..2c1c3bbfcdaaf 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -35,9 +35,7 @@ use crate::physical_plan::{ common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; -use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement, -}; +use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; /// Sort preserving merge execution plan /// @@ -117,7 +115,7 @@ impl ExecutionPlan for SortPreservingMergeExec { } fn required_input_ordering(&self) -> Vec>> { - vec![Some(make_sort_requirements_from_exprs(&self.expr))] + vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))] } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index f7f9bb76b3f44..5b22e4f02efed 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -194,18 +194,12 @@ pub(crate) fn calc_requirements( ) -> Option> { let mut sort_reqs = vec![]; for partition_by in partition_by_exprs { - sort_reqs.push(PhysicalSortRequirement { - expr: partition_by.clone(), - options: None, - }); + sort_reqs.push(PhysicalSortRequirement::new(partition_by.clone(), None)) } - for PhysicalSortExpr { expr, options } in orderby_sort_exprs { - let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr)); + for sort_expr in orderby_sort_exprs { + let contains = sort_reqs.iter().any(|e| sort_expr.expr.eq(e.expr())); if !contains { - sort_reqs.push(PhysicalSortRequirement { - expr: expr.clone(), - options: Some(*options), - }); + sort_reqs.push(PhysicalSortRequirement::from(sort_expr.clone())); } } // Convert empty result to None. Otherwise wrap result inside Some() @@ -297,7 +291,7 @@ mod tests { nulls_first, }); let expr = col(col_name, &schema)?; - let res = PhysicalSortRequirement { expr, options }; + let res = PhysicalSortRequirement::new(expr, options); if let Some(expected) = &mut expected { expected.push(res); } else { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index a1698fe072b40..84b177c4bf849 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -53,9 +53,7 @@ pub use equivalence::EquivalentClass; pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef}; pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; -pub use sort_expr::{ - make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement, -}; +pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement}; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema, diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 08bd394e6d117..0f0176064624a 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -63,9 +63,13 @@ impl PhysicalSortExpr { }) } - /// Check whether sort expression satisfies `PhysicalSortRequirement`. - // If sort options is Some in `PhysicalSortRequirement`, `expr` and `options` field are compared for equality. - // If sort options is None in `PhysicalSortRequirement`, only `expr` is compared for equality. + /// Check whether sort expression satisfies [`PhysicalSortRequirement`]. + /// + /// If sort options is Some in `PhysicalSortRequirement`, `expr` + /// and `options` field are compared for equality. + /// + /// If sort options is None in `PhysicalSortRequirement`, only + /// `expr` is compared for equality. pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool { self.expr.eq(&requirement.expr) && requirement @@ -75,21 +79,42 @@ impl PhysicalSortExpr { } /// Represents sort requirement associated with a plan +/// +/// If the requirement incudes [`SortOptions`] then both the +/// expression *and* the sort options must match. +/// +/// If the requirement does not include [`SortOptions`]) then only the +/// expressions must match. +/// +/// # Examples +/// +/// With sort options (`A`, `DESC NULLS FIRST`): +/// * `ORDER BY A DESC NULLS FIRST` matches +/// * `ORDER BY A ASC NULLS FIRST` does not match (`ASC` vs `DESC`) +/// * `ORDER BY B DESC NULLS FIRST` does not match (different expr) +/// +/// Without sort options (`A`, None): +/// * `ORDER BY A DESC NULLS FIRST` matches +/// * `ORDER BY A ASC NULLS FIRST` matches (`ASC` and `NULL` options ignored) +/// * `ORDER BY B DESC NULLS FIRST` does not match (different expr) #[derive(Clone, Debug)] pub struct PhysicalSortRequirement { /// Physical expression representing the column to sort - pub expr: Arc, + expr: Arc, /// Option to specify how the given column should be sorted. - /// If unspecified, there is no constraint on sort options. - pub options: Option, + /// If unspecified, there are no constraints on sort options. + options: Option, +} + +impl From for PhysicalSortExpr { + fn from(value: PhysicalSortRequirement) -> Self { + value.into_sort_expr() + } } impl From for PhysicalSortRequirement { fn from(value: PhysicalSortExpr) -> Self { - Self { - expr: value.expr, - options: Some(value.options), - } + PhysicalSortRequirement::new(value.expr, Some(value.options)) } } @@ -107,6 +132,41 @@ impl std::fmt::Display for PhysicalSortRequirement { } impl PhysicalSortRequirement { + /// Creates a new requirement. + /// + /// If `options` is `Some(..)`, creates an `exact` requirement, + /// which must match both `options` and `expr`. + /// + /// If `options` is `None`, Creates a new `expr_only` requirement, + /// which must match only `expr`. + /// + /// See [`PhysicalSortRequirement`] for examples. + pub fn new(expr: Arc, options: Option) -> Self { + Self { expr, options } + } + + /// Replace the required expression for this requirement with the new one + pub fn with_expr(mut self, expr: Arc) -> Self { + self.expr = expr; + self + } + + /// Converts the `PhysicalSortRequirement` to `PhysicalSortExpr`. + /// If required ordering is `None` for an entry, the default + /// ordering `ASC, NULLS LAST` is used. + /// + /// The default is picked to be consistent with + /// PostgreSQL: + pub fn into_sort_expr(self) -> PhysicalSortExpr { + let Self { expr, options } = self; + + let options = options.unwrap_or(SortOptions { + descending: false, + nulls_first: false, + }); + PhysicalSortExpr { expr, options } + } + /// Returns whether this requirement is equal or more specific than `other`. pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool { self.expr.eq(&other.expr) @@ -114,12 +174,46 @@ impl PhysicalSortRequirement { self.options.map_or(false, |opts| opts == other_opts) }) } -} -pub fn make_sort_requirements_from_exprs( - ordering: &[PhysicalSortExpr], -) -> Vec { - ordering.iter().map(|e| e.clone().into()).collect() + /// Returns [`PhysicalSortRequirement`] that requires the exact + /// sort of the [`PhysicalSortExpr`]s in `ordering` + /// + /// This method takes `&'a PhysicalSortExpr` to make it easy to + /// use implementing [`ExecutionPlan::required_input_ordering`]. + pub fn from_sort_exprs<'a>( + ordering: impl IntoIterator, + ) -> Vec { + ordering + .into_iter() + .cloned() + .map(PhysicalSortRequirement::from) + .collect() + } + + /// Converts an iterator of [`PhysicalSortRequirement`] into a Vec + /// of [`PhysicalSortExpr`]s. + /// + /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` + /// for each entry in the input. If required ordering is None for an entry + /// default ordering `ASC, NULLS LAST` if given (see [`into_sort_expr`]) + pub fn to_sort_exprs( + requirements: impl IntoIterator, + ) -> Vec { + requirements + .into_iter() + .map(PhysicalSortExpr::from) + .collect() + } + + /// Returns the expr for this requirement + pub fn expr(&self) -> &Arc { + &self.expr + } + + /// Returns the required options, for this requirement + pub fn options(&self) -> Option { + self.options + } } /// Returns the SQL string representation of the given [SortOptions] object. diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 5c30d523d8a1e..e95aa28b37b34 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -24,7 +24,6 @@ use arrow::datatypes::SchemaRef; use datafusion_common::Result; use datafusion_expr::Operator; -use arrow_schema::SortOptions; use datafusion_common::tree_node::{ Transformed, TreeNode, TreeNodeRewriter, VisitRecursion, }; @@ -198,14 +197,11 @@ pub fn normalize_sort_requirement_with_equivalence_properties( eq_properties: &[EquivalentClass], ) -> PhysicalSortRequirement { let normalized_expr = normalize_expr_with_equivalence_properties( - sort_requirement.expr.clone(), + sort_requirement.expr().clone(), eq_properties, ); - if sort_requirement.expr.ne(&normalized_expr) { - PhysicalSortRequirement { - expr: normalized_expr, - options: sort_requirement.options, - } + if sort_requirement.expr().ne(&normalized_expr) { + sort_requirement.with_expr(normalized_expr) } else { sort_requirement } @@ -390,35 +386,6 @@ pub fn map_columns_before_projection( .collect() } -/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` -/// for each entry in the input. If required ordering is None for an entry -/// default ordering `ASC, NULLS LAST` if given. -pub fn make_sort_exprs_from_requirements( - required: &[PhysicalSortRequirement], -) -> Vec { - required - .iter() - .map(|requirement| { - if let Some(options) = requirement.options { - PhysicalSortExpr { - expr: requirement.expr.clone(), - options, - } - } else { - PhysicalSortExpr { - expr: requirement.expr.clone(), - options: SortOptions { - // By default, create sort key with ASC is true and NULLS LAST to be consistent with - // PostgreSQL's rule: https://www.postgresql.org/docs/current/queries-order.html - descending: false, - nulls_first: false, - }, - } - } - }) - .collect() -} - #[derive(Clone, Debug)] pub struct ExprTreeNode { expr: Arc,