Skip to content
6 changes: 2 additions & 4 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,7 @@ mod tests {
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};
use datafusion_physical_expr::PhysicalSortRequirement;

fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
Expand Down Expand Up @@ -1162,7 +1160,7 @@ mod tests {
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.map(make_sort_requirements_from_exprs)]
.map(PhysicalSortRequirement::from_sort_exprs)]
}

fn with_new_children(
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::utils::{
make_sort_exprs_from_requirements, ordering_satisfy,
ordering_satisfy_requirement_concrete,
ordering_satisfy, ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
use itertools::{concat, izip};
use std::iter::zip;
use std::sync::Arc;
Expand Down Expand Up @@ -468,7 +467,8 @@ fn ensure_sorting(
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr = make_sort_exprs_from_requirements(&required_ordering);
let sort_expr =
PhysicalSortRequirement::to_sort_exprs(required_ordering);
add_sort_above(child, sort_expr)?;
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
Expand All @@ -479,7 +479,7 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
let sort_expr = make_sort_exprs_from_requirements(&required);
let sort_expr = PhysicalSortRequirement::to_sort_exprs(required);
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
Expand Down
44 changes: 23 additions & 21 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@ use datafusion_common::{DataFusionError, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{
make_sort_exprs_from_requirements, ordering_satisfy_requirement,
requirements_compatible,
};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement,
ordering_satisfy_requirement, requirements_compatible,
};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use itertools::izip;
use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -130,14 +127,15 @@ pub(crate) fn pushdown_sorts(
plan.equivalence_properties()
}) {
// If the current plan is a SortExec, modify it to satisfy parent requirements:
let parent_required_expr =
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
new_plan = sort_exec.input.clone();
add_sort_above(&mut new_plan, parent_required_expr)?;
};
let required_ordering = new_plan
.output_ordering()
.map(make_sort_requirements_from_exprs);
.map(PhysicalSortRequirement::from_sort_exprs);
// Since new_plan is a SortExec, we can safely get the 0th index.
let child = &new_plan.children()[0];
if let Some(adjusted) =
Expand Down Expand Up @@ -173,8 +171,9 @@ pub(crate) fn pushdown_sorts(
}))
} else {
// Can not push down requirements, add new SortExec:
let parent_required_expr =
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let mut new_plan = plan.clone();
add_sort_above(&mut new_plan, parent_required_expr)?;
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
Expand Down Expand Up @@ -210,8 +209,9 @@ fn pushdown_requirement_to_children(
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left.schema().fields().len();
let parent_required_expr =
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let expr_source_side =
expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
match expr_source_side {
Expand Down Expand Up @@ -383,15 +383,17 @@ fn shift_right_required(
let new_right_required: Vec<PhysicalSortRequirement> = parent_required
.iter()
.filter_map(|r| {
r.expr.as_any().downcast_ref::<Column>().and_then(|col| {
(col.index() >= left_columns_len).then_some(PhysicalSortRequirement {
expr: Arc::new(Column::new(
col.name(),
col.index() - left_columns_len,
)) as _,
options: r.options,
})
})
let Some(col) = r.expr().as_any().downcast_ref::<Column>() else {
return None;
};

if col.index() < left_columns_len {
return None;
}

let new_col =
Arc::new(Column::new(col.name(), col.index() - left_columns_len));
Some(r.clone().with_expr(new_col))
})
.collect::<Vec<_>>();
if new_right_required.len() == parent_required.len() {
Expand Down
12 changes: 7 additions & 5 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ use arrow::compute::{concat_batches, take, SortOptions};
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};
use datafusion_physical_expr::PhysicalSortRequirement;
use futures::{Stream, StreamExt};

use crate::error::DataFusionError;
Expand Down Expand Up @@ -230,8 +228,12 @@ impl ExecutionPlan for SortMergeJoinExec {

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![
Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)),
Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)),
Some(PhysicalSortRequirement::from_sort_exprs(
&self.left_sort_exprs,
)),
Some(PhysicalSortRequirement::from_sort_exprs(
&self.right_sort_exprs,
)),
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ use crate::physical_plan::{
common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType,
Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement,
};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};

/// Sort preserving merge execution plan
///
Expand Down Expand Up @@ -117,7 +115,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![Some(make_sort_requirements_from_exprs(&self.expr))]
vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))]
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Expand Down
16 changes: 5 additions & 11 deletions datafusion/core/src/physical_plan/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,18 +194,12 @@ pub(crate) fn calc_requirements(
) -> Option<Vec<PhysicalSortRequirement>> {
let mut sort_reqs = vec![];
for partition_by in partition_by_exprs {
sort_reqs.push(PhysicalSortRequirement {
expr: partition_by.clone(),
options: None,
});
sort_reqs.push(PhysicalSortRequirement::new(partition_by.clone(), None))
}
for PhysicalSortExpr { expr, options } in orderby_sort_exprs {
let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr));
for sort_expr in orderby_sort_exprs {
let contains = sort_reqs.iter().any(|e| sort_expr.expr.eq(e.expr()));
if !contains {
sort_reqs.push(PhysicalSortRequirement {
expr: expr.clone(),
options: Some(*options),
});
sort_reqs.push(PhysicalSortRequirement::from(sort_expr.clone()));
}
}
// Convert empty result to None. Otherwise wrap result inside Some()
Expand Down Expand Up @@ -297,7 +291,7 @@ mod tests {
nulls_first,
});
let expr = col(col_name, &schema)?;
let res = PhysicalSortRequirement { expr, options };
let res = PhysicalSortRequirement::new(expr, options);
if let Some(expected) = &mut expected {
expected.push(res);
} else {
Expand Down
4 changes: 1 addition & 3 deletions datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ pub use equivalence::EquivalentClass;
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
pub use sort_expr::{
make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement,
};
pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema,
Expand Down
124 changes: 109 additions & 15 deletions datafusion/physical-expr/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ impl PhysicalSortExpr {
})
}

/// Check whether sort expression satisfies `PhysicalSortRequirement`.
// If sort options is Some in `PhysicalSortRequirement`, `expr` and `options` field are compared for equality.
// If sort options is None in `PhysicalSortRequirement`, only `expr` is compared for equality.
/// Check whether sort expression satisfies [`PhysicalSortRequirement`].
///
/// If sort options is Some in `PhysicalSortRequirement`, `expr`
/// and `options` field are compared for equality.
///
/// If sort options is None in `PhysicalSortRequirement`, only
/// `expr` is compared for equality.
pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
self.expr.eq(&requirement.expr)
&& requirement
Expand All @@ -75,21 +79,42 @@ impl PhysicalSortExpr {
}

/// Represents sort requirement associated with a plan
///
/// If the requirement incudes [`SortOptions`] then both the
/// expression *and* the sort options must match.
///
/// If the requirement does not include [`SortOptions`]) then only the
/// expressions must match.
///
/// # Examples
///
/// With sort options (`A`, `DESC NULLS FIRST`):
/// * `ORDER BY A DESC NULLS FIRST` matches
/// * `ORDER BY A ASC NULLS FIRST` does not match (`ASC` vs `DESC`)
/// * `ORDER BY B DESC NULLS FIRST` does not match (different expr)
///
/// Without sort options (`A`, None):
/// * `ORDER BY A DESC NULLS FIRST` matches
/// * `ORDER BY A ASC NULLS FIRST` matches (`ASC` and `NULL` options ignored)
/// * `ORDER BY B DESC NULLS FIRST` does not match (different expr)
#[derive(Clone, Debug)]
pub struct PhysicalSortRequirement {
/// Physical expression representing the column to sort
pub expr: Arc<dyn PhysicalExpr>,
expr: Arc<dyn PhysicalExpr>,
/// Option to specify how the given column should be sorted.
/// If unspecified, there is no constraint on sort options.
pub options: Option<SortOptions>,
/// If unspecified, there are no constraints on sort options.
options: Option<SortOptions>,
}

impl From<PhysicalSortRequirement> for PhysicalSortExpr {
fn from(value: PhysicalSortRequirement) -> Self {
value.into_sort_expr()
}
}

impl From<PhysicalSortExpr> for PhysicalSortRequirement {
fn from(value: PhysicalSortExpr) -> Self {
Self {
expr: value.expr,
options: Some(value.options),
}
PhysicalSortRequirement::new(value.expr, Some(value.options))
}
}

Expand All @@ -107,19 +132,88 @@ impl std::fmt::Display for PhysicalSortRequirement {
}

impl PhysicalSortRequirement {
/// Creates a new requirement.
///
/// If `options` is `Some(..)`, creates an `exact` requirement,
/// which must match both `options` and `expr`.
///
/// If `options` is `None`, Creates a new `expr_only` requirement,
/// which must match only `expr`.
///
/// See [`PhysicalSortRequirement`] for examples.
pub fn new(expr: Arc<dyn PhysicalExpr>, options: Option<SortOptions>) -> Self {
Self { expr, options }
}

/// Replace the required expression for this requirement with the new one
pub fn with_expr(mut self, expr: Arc<dyn PhysicalExpr>) -> Self {
self.expr = expr;
self
}

/// Converts the `PhysicalSortRequirement` to `PhysicalSortExpr`.
/// If required ordering is `None` for an entry, the default
/// ordering `ASC, NULLS LAST` is used.
///
/// The default is picked to be consistent with
/// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>
pub fn into_sort_expr(self) -> PhysicalSortExpr {
let Self { expr, options } = self;

let options = options.unwrap_or(SortOptions {
Copy link
Contributor

@ozankabak ozankabak Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest unwrap_or_else to explicitly avoid constructing a potentially unused object. The compiler will likely take care of this, but I find making intent visible in code structure is good practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed -- I agree with you. I originally had unwrap_or_else for precisely the reasons you mentioned, but clippy told me to use unwrap_or instead 😭

error: unnecessary closure used to substitute value for `Option::None`
   --> datafusion/physical-expr/src/sort_expr.rs:185:23
    |
185 |           let options = options.unwrap_or_else(|| SortOptions {
    |  _______________________^
186 | |             descending: false,
187 | |             nulls_first: false,
188 | |         });
    | |__________^
    |
    = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#unnecessary_lazy_evaluations
    = note: `-D clippy::unnecessary-lazy-evaluations` implied by `-D warnings`
help: use `unwrap_or(..)` instead
    |
185 ~         let options = options.unwrap_or(SortOptions {
186 +             descending: false,
187 +             nulls_first: false,
188 ~         });
    |

descending: false,
nulls_first: false,
});
PhysicalSortExpr { expr, options }
}

/// Returns whether this requirement is equal or more specific than `other`.
pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
self.expr.eq(&other.expr)
&& other.options.map_or(true, |other_opts| {
self.options.map_or(false, |opts| opts == other_opts)
})
}
}

pub fn make_sort_requirements_from_exprs(
ordering: &[PhysicalSortExpr],
) -> Vec<PhysicalSortRequirement> {
ordering.iter().map(|e| e.clone().into()).collect()
/// Returns [`PhysicalSortRequirement`] that requires the exact
/// sort of the [`PhysicalSortExpr`]s in `ordering`
///
/// This method takes `&'a PhysicalSortExpr` to make it easy to
/// use implementing [`ExecutionPlan::required_input_ordering`].
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> Vec<PhysicalSortRequirement> {
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect()
}

/// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
/// of [`PhysicalSortExpr`]s.
///
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see [`into_sort_expr`])
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Vec<PhysicalSortExpr> {
Comment on lines +199 to +201
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use

pub fn to_sort_exprs<'a>(
        requirements: impl IntoIterator<Item = &'a PhysicalSortRequirement>,
    ) -> Vec<PhysicalSortExpr> 

as signature for to_sort_exprs as in from_sort_exprs. This would help remove a couple clones in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale for taking PhysicalSortRequirement rather than &PhysicalSortRequirement was actually to avoid a clone in the cases where the caller already had an owned PhysicalSortRequirement. Basically the clone() is moved from inside this function to the callsite

Specifically in datafusion/core/src/physical_optimizer/sort_enforcement.rs

                    // Make sure we preserve the ordering requirements:
                    update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
                    let sort_expr =
                        PhysicalSortRequirement::to_sort_exprs(required_ordering);
                    add_sort_above(child, sort_expr)?;
                    if is_sort(child) {

This signature can avoid a clone.

If the signature looks like

pub fn to_sort_exprs<'a>(
        requirements: impl IntoIterator<Item = &'a PhysicalSortRequirement>,
    ) -> Vec<PhysicalSortExpr> 

I think it would require always clone()ing the expr and options internally to create an owned PhysicalSortExpr in the output

Of course, we are talking about cloning an Arc and some SortOptions which presumably isn't all that expensive , so maybe it doesn't matter 🤔

Copy link
Contributor

@ozankabak ozankabak Apr 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling there may be a way to do this with a Borrow trick to implement a single function that can deal with both owned and referenced arguments without duplicating code 🤔

Copy link
Contributor

@ozankabak ozankabak Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb, it seems like using

pub fn to_sort_exprs<T: Borrow<PhysicalSortRequirement>>(
    requirements: impl IntoIterator<Item = T>,
) -> Vec<PhysicalSortExpr> {
    requirements
        .into_iter()
        .map(|e| PhysicalSortExpr::from(e.borrow()))
        .collect()
}

pub fn from_sort_exprs<T: Borrow<PhysicalSortExpr>>(
    ordering: impl IntoIterator<Item = T>,
) -> Vec<PhysicalSortRequirement> {
    ordering
        .into_iter()
        .map(|e| PhysicalSortRequirement::from(e.borrow()))
        .collect()
}

lets us get rid of a bunch of clones and results in simpler call site code. AFAIK, this is a zero-cost mechanism. If you concur, we can use this pattern.

I created #11 on your repo so you can easily check it out and incorporate (assuming I'm not overlooking something and this is indeed zero cost obviously).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my point was that I think this call:

PhysicalSortRequirement::from(e.borrow()

Invokes this From impl:

impl From<&PhysicalSortRequirement> for PhysicalSortExpr {
    fn from(value: &PhysicalSortRequirement) -> Self {
        value.clone().into_sort_expr()
    }
}

Which has a clone of the value

I have removed the From<&PhysicalSortRequirement> traits in 12cecdb because it is obscuring the locations of the cloning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought you wanted to make to_sort_exprs "smart" in the sense that it can take work with an owned value or a reference, using the owned or reference From impl depending on the argument -- making a clone only when necessary.

Anyway I got curious and tried to verify whether the borrow mechanism was zero-cost for this purpose; it turns out it is not. It always calls the reference path. I also found how to do this -- here is an example for future reference.

For posterity, had we wanted to go this route, the smart to_sort_exprs would be:

pub fn to_sort_exprs<T>(
    requirements: impl IntoIterator<Item = T>,
) -> Vec<PhysicalSortExpr>
where
    PhysicalSortExpr: From<T>,
{
    requirements
        .into_iter()
        .map(|e| PhysicalSortExpr::from(e))
        .collect()
}

which would route to the right PhysicalSortExpr::from depending on the argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool -- I did not know that. If you think that is a worthwhile change, I would be happy to make a follow on PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be a good change, I can already see some use cases for it in some upcoming code (e.g. streaming group bys).

I updated the mini-PR I created for experimentation so you can get the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks -- I tried to pull the commits into #5918 but they still seem to be trying to use this impl:

impl From<&PhysicalSortRequirement> for PhysicalSortExpr {

I may have messed up the merge somehow

Let's keep collaborating on #5918

requirements
.into_iter()
.map(PhysicalSortExpr::from)
.collect()
}

/// Returns the expr for this requirement
pub fn expr(&self) -> &Arc<dyn PhysicalExpr> {
&self.expr
}

/// Returns the required options, for this requirement
pub fn options(&self) -> Option<SortOptions> {
self.options
}
}

/// Returns the SQL string representation of the given [SortOptions] object.
Expand Down
Loading