From 4d9987d55335c7ef792e51f51836084e71743300 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Thu, 6 Apr 2023 19:49:00 -0500 Subject: [PATCH 1/2] Use std::borrow::Borrow to simplify code --- .../core/src/physical_optimizer/sort_pushdown.rs | 15 ++++++--------- datafusion/physical-expr/src/sort_expr.rs | 13 +++++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 94b361cb95926..d1cf9c957f863 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -127,9 +127,8 @@ pub(crate) fn pushdown_sorts( plan.equivalence_properties() }) { // If the current plan is a SortExec, modify it to satisfy parent requirements: - let parent_required_expr = PhysicalSortRequirement::to_sort_exprs( - parent_required.ok_or_else(err)?.iter().cloned(), - ); + let parent_required_expr = + PhysicalSortRequirement::to_sort_exprs(parent_required.ok_or_else(err)?); new_plan = sort_exec.input.clone(); add_sort_above(&mut new_plan, parent_required_expr)?; }; @@ -171,9 +170,8 @@ pub(crate) fn pushdown_sorts( })) } else { // Can not push down requirements, add new SortExec: - let parent_required_expr = PhysicalSortRequirement::to_sort_exprs( - parent_required.ok_or_else(err)?.iter().cloned(), - ); + let parent_required_expr = + PhysicalSortRequirement::to_sort_exprs(parent_required.ok_or_else(err)?); let mut new_plan = plan.clone(); add_sort_above(&mut new_plan, parent_required_expr)?; Ok(Transformed::Yes(SortPushDown::init(new_plan))) @@ -209,9 +207,8 @@ fn pushdown_requirement_to_children( } else if let Some(smj) = plan.as_any().downcast_ref::() { // If the current plan is SortMergeJoinExec let left_columns_len = smj.left.schema().fields().len(); - let parent_required_expr = PhysicalSortRequirement::to_sort_exprs( - parent_required.ok_or_else(err)?.iter().cloned(), - ); + let parent_required_expr = + PhysicalSortRequirement::to_sort_exprs(parent_required.ok_or_else(err)?); let expr_source_side = expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len); match expr_source_side { diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 0cd67c358e097..e6cac4e93f688 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -22,6 +22,7 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; +use std::borrow::Borrow; use std::sync::Arc; /// Represents Sort operation for a column in a RecordBatch @@ -192,12 +193,12 @@ impl PhysicalSortRequirement { /// /// This method is designed for /// use implementing [`ExecutionPlan::required_input_ordering`]. - pub fn from_sort_exprs<'a>( - ordering: impl IntoIterator, + pub fn from_sort_exprs>( + ordering: impl IntoIterator, ) -> Vec { ordering .into_iter() - .map(PhysicalSortRequirement::from) + .map(|e| PhysicalSortRequirement::from(e.borrow())) .collect() } @@ -207,12 +208,12 @@ impl PhysicalSortRequirement { /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` /// for each entry in the input. If required ordering is None for an entry /// default ordering `ASC, NULLS LAST` if given (see [`into_sort_expr`]) - pub fn to_sort_exprs( - requirements: impl IntoIterator, + pub fn to_sort_exprs>( + requirements: impl IntoIterator, ) -> Vec { requirements .into_iter() - .map(PhysicalSortExpr::from) + .map(|e| PhysicalSortExpr::from(e.borrow())) .collect() } From 155a9b0a253e399523767e566e27308e92f41b40 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Fri, 7 Apr 2023 09:31:38 -0500 Subject: [PATCH 2/2] Use generics instead of borrow for zero-cost --- datafusion/physical-expr/src/sort_expr.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index e6cac4e93f688..8867924bdc233 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -22,7 +22,6 @@ use arrow::compute::kernels::sort::{SortColumn, SortOptions}; use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; -use std::borrow::Borrow; use std::sync::Arc; /// Represents Sort operation for a column in a RecordBatch @@ -193,12 +192,15 @@ impl PhysicalSortRequirement { /// /// This method is designed for /// use implementing [`ExecutionPlan::required_input_ordering`]. - pub fn from_sort_exprs>( + pub fn from_sort_exprs( ordering: impl IntoIterator, - ) -> Vec { + ) -> Vec + where + PhysicalSortRequirement: From, + { ordering .into_iter() - .map(|e| PhysicalSortRequirement::from(e.borrow())) + .map(|e| PhysicalSortRequirement::from(e)) .collect() } @@ -208,12 +210,15 @@ impl PhysicalSortRequirement { /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` /// for each entry in the input. If required ordering is None for an entry /// default ordering `ASC, NULLS LAST` if given (see [`into_sort_expr`]) - pub fn to_sort_exprs>( + pub fn to_sort_exprs( requirements: impl IntoIterator, - ) -> Vec { + ) -> Vec + where + PhysicalSortExpr: From, + { requirements .into_iter() - .map(|e| PhysicalSortExpr::from(e.borrow())) + .map(|e| PhysicalSortExpr::from(e)) .collect() }