From 937c8974457a1d5b20534245a5236078196f6692 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sat, 25 Mar 2023 19:00:30 -0500 Subject: [PATCH] Use type aliases ExprOrdering and OrderingRequirements to increase usability --- benchmarks/src/bin/parquet.rs | 4 +- .../examples/custom_datasource.rs | 4 +- datafusion/core/benches/merge.rs | 4 +- datafusion/core/benches/sort.rs | 4 +- .../core/src/datasource/listing/table.rs | 4 +- .../physical_optimizer/dist_enforcement.rs | 6 +-- .../src/physical_optimizer/repartition.rs | 10 ++--- .../physical_optimizer/sort_enforcement.rs | 8 ++-- .../src/physical_optimizer/sort_pushdown.rs | 44 +++++++++---------- .../core/src/physical_optimizer/utils.rs | 4 +- .../core/src/physical_plan/aggregates/mod.rs | 8 ++-- datafusion/core/src/physical_plan/analyze.rs | 4 +- .../src/physical_plan/coalesce_batches.rs | 4 +- .../src/physical_plan/coalesce_partitions.rs | 4 +- datafusion/core/src/physical_plan/common.rs | 27 ++++++------ datafusion/core/src/physical_plan/empty.rs | 4 +- datafusion/core/src/physical_plan/explain.rs | 4 +- .../src/physical_plan/file_format/avro.rs | 4 +- .../core/src/physical_plan/file_format/csv.rs | 4 +- .../src/physical_plan/file_format/json.rs | 4 +- .../core/src/physical_plan/file_format/mod.rs | 6 +-- .../src/physical_plan/file_format/parquet.rs | 6 +-- datafusion/core/src/physical_plan/filter.rs | 4 +- .../src/physical_plan/joins/cross_join.rs | 6 +-- .../core/src/physical_plan/joins/hash_join.rs | 5 +-- .../physical_plan/joins/nested_loop_join.rs | 4 +- .../physical_plan/joins/sort_merge_join.rs | 25 +++++------ .../joins/symmetric_hash_join.rs | 11 +++-- datafusion/core/src/physical_plan/limit.rs | 6 +-- datafusion/core/src/physical_plan/memory.rs | 11 ++--- datafusion/core/src/physical_plan/mod.rs | 15 ++++--- datafusion/core/src/physical_plan/planner.rs | 3 +- .../core/src/physical_plan/projection.rs | 6 +-- .../core/src/physical_plan/repartition/mod.rs | 4 +- .../core/src/physical_plan/sorts/sort.rs | 27 ++++++------ .../sorts/sort_preserving_merge.rs | 33 +++++++------- .../core/src/physical_plan/streaming.rs | 4 +- datafusion/core/src/physical_plan/union.rs | 4 +- datafusion/core/src/physical_plan/unnest.rs | 6 +-- datafusion/core/src/physical_plan/values.rs | 4 +- .../windows/bounded_window_agg_exec.rs | 14 +++--- .../core/src/physical_plan/windows/mod.rs | 12 ++--- .../physical_plan/windows/window_agg_exec.rs | 13 +++--- .../core/src/scheduler/pipeline/execution.rs | 4 +- datafusion/core/src/test/exec.rs | 12 ++--- datafusion/core/src/test_util/mod.rs | 4 +- datafusion/core/tests/custom_sources.rs | 4 +- .../core/tests/provider_filter_pushdown.rs | 4 +- datafusion/core/tests/statistics.rs | 4 +- datafusion/core/tests/user_defined_plan.rs | 4 +- .../physical-expr/src/expressions/mod.rs | 2 +- datafusion/physical-expr/src/lib.rs | 3 +- datafusion/physical-expr/src/sort_expr.rs | 8 ++-- datafusion/physical-expr/src/utils.rs | 43 +++++++++--------- .../physical-expr/src/window/aggregate.rs | 8 ++-- .../physical-expr/src/window/built_in.rs | 8 ++-- .../src/window/sliding_aggregate.rs | 8 ++-- .../physical-expr/src/window/window_expr.rs | 6 +-- .../proto/src/physical_plan/from_proto.rs | 4 +- 59 files changed, 255 insertions(+), 259 deletions(-) diff --git a/benchmarks/src/bin/parquet.rs b/benchmarks/src/bin/parquet.rs index 658d924dfb2ac..93de875d3510b 100644 --- a/benchmarks/src/bin/parquet.rs +++ b/benchmarks/src/bin/parquet.rs @@ -20,8 +20,8 @@ use datafusion::common::Result; use datafusion::logical_expr::{lit, or, Expr}; use datafusion::optimizer::utils::disjunction; use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::collect; use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::{collect, ExprOrderingRef}; use datafusion::prelude::{col, SessionConfig, SessionContext}; use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile}; use datafusion_benchmarks::BenchmarkRun; @@ -319,7 +319,7 @@ async fn exec_scan( async fn exec_sort( ctx: &SessionContext, - expr: &[PhysicalSortExpr], + expr: ExprOrderingRef<'_>, test_file: &TestParquetFile, debug: bool, ) -> Result<(usize, std::time::Duration)> { diff --git a/datafusion-examples/examples/custom_datasource.rs b/datafusion-examples/examples/custom_datasource.rs index c426d9611c608..d02463006dfd9 100644 --- a/datafusion-examples/examples/custom_datasource.rs +++ b/datafusion-examples/examples/custom_datasource.rs @@ -24,7 +24,7 @@ use datafusion::datasource::provider_as_source; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionState, TaskContext}; -use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::expressions::ExprOrderingRef; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::{ project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics, @@ -217,7 +217,7 @@ impl ExecutionPlan for CustomExec { datafusion::physical_plan::Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/benches/merge.rs b/datafusion/core/benches/merge.rs index f1c4736039f9c..2466066874d77 100644 --- a/datafusion/core/benches/merge.rs +++ b/datafusion/core/benches/merge.rs @@ -89,7 +89,7 @@ use datafusion::{ }, prelude::SessionContext, }; -use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; +use datafusion_physical_expr::{expressions::col, ExprOrdering, PhysicalSortExpr}; use futures::StreamExt; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -304,7 +304,7 @@ impl MergeBenchCase { } /// Make sort exprs for each column in `schema` -fn make_sort_exprs(schema: &Schema) -> Vec { +fn make_sort_exprs(schema: &Schema) -> ExprOrdering { schema .fields() .iter() diff --git a/datafusion/core/benches/sort.rs b/datafusion/core/benches/sort.rs index 0507a9308a289..501ad8507f256 100644 --- a/datafusion/core/benches/sort.rs +++ b/datafusion/core/benches/sort.rs @@ -35,7 +35,7 @@ use datafusion::{ physical_plan::{memory::MemoryExec, sorts::sort::SortExec, ExecutionPlan}, prelude::SessionContext, }; -use datafusion_physical_expr::{expressions::col, PhysicalSortExpr}; +use datafusion_physical_expr::{expressions::col, ExprOrdering, PhysicalSortExpr}; use futures::StreamExt; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -288,7 +288,7 @@ impl SortBenchCasePreservePartitioning { } /// Make sort exprs for each column in `schema` -fn make_sort_exprs(schema: &Schema) -> Vec { +fn make_sort_exprs(schema: &Schema) -> ExprOrdering { schema .fields() .iter() diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index f85492d8c2ed4..9f8bda0a9f1ac 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -27,7 +27,7 @@ use dashmap::DashMap; use datafusion_common::ToDFSchema; use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; -use datafusion_physical_expr::{create_physical_expr, PhysicalSortExpr}; +use datafusion_physical_expr::{create_physical_expr, ExprOrdering, PhysicalSortExpr}; use futures::{future, stream, StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::ObjectMeta; @@ -611,7 +611,7 @@ impl ListingTable { } /// If file_sort_order is specified, creates the appropriate physical expressions - fn try_create_output_ordering(&self) -> Result>> { + fn try_create_output_ordering(&self) -> Result> { let file_sort_order = if let Some(file_sort_order) = self.options.file_sort_order.as_ref() { file_sort_order diff --git a/datafusion/core/src/physical_optimizer/dist_enforcement.rs b/datafusion/core/src/physical_optimizer/dist_enforcement.rs index d3e99945e9253..950188288f9e5 100644 --- a/datafusion/core/src/physical_optimizer/dist_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/dist_enforcement.rs @@ -949,7 +949,7 @@ mod tests { use datafusion_expr::Operator; use datafusion_physical_expr::{ expressions, expressions::binary, expressions::lit, expressions::Column, - PhysicalExpr, PhysicalSortExpr, + ExprOrdering, PhysicalExpr, PhysicalSortExpr, }; use std::ops::Deref; @@ -982,9 +982,7 @@ mod tests { parquet_exec_with_sort(None) } - fn parquet_exec_with_sort( - output_ordering: Option>, - ) -> Arc { + fn parquet_exec_with_sort(output_ordering: Option) -> Arc { Arc::new(ParquetExec::new( FileScanConfig { object_store_url: ObjectStoreUrl::parse("test:///").unwrap(), diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 6c2d5c93482c5..e9e69f897e3e1 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -340,7 +340,7 @@ mod tests { use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, DisplayFormatType, Statistics}; use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortRequirement, + make_requirements_from_ordering, ExprOrderingRef, OrderingRequirement, }; fn schema() -> SchemaRef { @@ -1150,7 +1150,7 @@ mod tests { self.input.output_partitioning() } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input.output_ordering() } @@ -1159,10 +1159,8 @@ mod tests { } // model that it requires the output ordering of its input - fn required_input_ordering(&self) -> Vec>> { - vec![self - .output_ordering() - .map(make_sort_requirements_from_exprs)] + fn required_input_ordering(&self) -> Vec> { + vec![self.output_ordering().map(make_requirements_from_ordering)] } fn with_new_children( diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index 7428c339dccce..7bf07a0c12a32 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -53,7 +53,7 @@ use datafusion_physical_expr::utils::{ make_sort_exprs_from_requirements, ordering_satisfy, ordering_satisfy_requirement_concrete, }; -use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr}; +use datafusion_physical_expr::{ExprOrderingRef, PhysicalExpr, PhysicalSortExpr}; use itertools::{concat, izip}; use std::iter::zip; use std::sync::Arc; @@ -766,7 +766,7 @@ fn remove_corresponding_sort_from_sub_plan( } /// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice when possible. -fn get_sort_exprs(sort_any: &Arc) -> Result<&[PhysicalSortExpr]> { +fn get_sort_exprs(sort_any: &Arc) -> Result { if let Some(sort_exec) = sort_any.as_any().downcast_ref::() { Ok(sort_exec.expr()) } else if let Some(sort_preserving_merge_exec) = @@ -795,9 +795,9 @@ pub struct ColumnInfo { /// remove physical sort expressions from the plan. pub fn can_skip_sort( partition_keys: &[Arc], - required: &[PhysicalSortExpr], + required: ExprOrderingRef, input_schema: &SchemaRef, - physical_ordering: &[PhysicalSortExpr], + physical_ordering: ExprOrderingRef, ) -> Result<(bool, bool)> { if required.len() > physical_ordering.len() { return Ok((false, false)); diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 07d0002548dee..0e1961171f1fd 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -31,10 +31,10 @@ use datafusion_physical_expr::utils::{ requirements_compatible, }; use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement, + make_requirements_from_ordering, ExprOrdering, ExprOrderingRef, OrderingRequirement, + PhysicalSortRequirement, }; use itertools::izip; -use std::ops::Deref; use std::sync::Arc; /// This is a "data class" we use within the [`EnforceSorting`] rule to push @@ -45,10 +45,10 @@ pub(crate) struct SortPushDown { /// Current plan pub plan: Arc, /// Parent required sort ordering - required_ordering: Option>, + required_ordering: Option, /// The adjusted request sort ordering to children. /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties. - adjusted_request_ordering: Vec>>, + adjusted_request_ordering: Vec>, } impl SortPushDown { @@ -121,7 +121,7 @@ pub(crate) fn pushdown_sorts( requirements: SortPushDown, ) -> Result> { let plan = &requirements.plan; - let parent_required = requirements.required_ordering.as_deref(); + let parent_required = requirements.required_ordering.as_ref(); const ERR_MSG: &str = "Expects parent requirement to contain something"; let err = || DataFusionError::Plan(ERR_MSG.to_string()); if let Some(sort_exec) = plan.as_any().downcast_ref::() { @@ -137,11 +137,11 @@ pub(crate) fn pushdown_sorts( }; let required_ordering = new_plan .output_ordering() - .map(make_sort_requirements_from_exprs); + .map(make_requirements_from_ordering); // Since new_plan is a SortExec, we can safely get the 0th index. let child = &new_plan.children()[0]; if let Some(adjusted) = - pushdown_requirement_to_children(child, required_ordering.as_deref())? + pushdown_requirement_to_children(child, required_ordering.as_ref())? { // Can push down requirements Ok(Transformed::Yes(SortPushDown { @@ -184,14 +184,14 @@ pub(crate) fn pushdown_sorts( fn pushdown_requirement_to_children( plan: &Arc, - parent_required: Option<&[PhysicalSortRequirement]>, -) -> Result>>>> { + parent_required: Option<&OrderingRequirement>, +) -> Result>>> { const ERR_MSG: &str = "Expects parent requirement to contain something"; let err = || DataFusionError::Plan(ERR_MSG.to_string()); let maintains_input_order = plan.maintains_input_order(); if is_window(plan) { let required_input_ordering = plan.required_input_ordering(); - let request_child = required_input_ordering[0].as_deref(); + let request_child = required_input_ordering[0].as_ref(); let child_plan = plan.children()[0].clone(); match determine_children_requirement(parent_required, request_child, child_plan) { RequirementsCompatibility::Satisfy => { @@ -238,7 +238,7 @@ fn pushdown_requirement_to_children( }; try_pushdown_requirements_to_join( plan, - Some(new_right_required.deref()), + Some(new_right_required.as_ref()), parent_required_expr, JoinSide::Right, ) @@ -274,8 +274,8 @@ fn pushdown_requirement_to_children( /// If the the parent requirements are more specific, push down the parent requirements /// If they are not compatible, need to add Sort. fn determine_children_requirement( - parent_required: Option<&[PhysicalSortRequirement]>, - request_child: Option<&[PhysicalSortRequirement]>, + parent_required: Option<&OrderingRequirement>, + request_child: Option<&OrderingRequirement>, child_plan: Arc, ) -> RequirementsCompatibility { if requirements_compatible(request_child, parent_required, || { @@ -296,16 +296,16 @@ fn determine_children_requirement( fn try_pushdown_requirements_to_join( plan: &Arc, - parent_required: Option<&[PhysicalSortRequirement]>, - sort_expr: Vec, + parent_required: Option<&OrderingRequirement>, + sort_expr: ExprOrdering, push_side: JoinSide, -) -> Result>>>> { +) -> Result>>> { let child_idx = match push_side { JoinSide::Left => 0, JoinSide::Right => 1, }; let required_input_ordering = plan.required_input_ordering(); - let request_child = required_input_ordering[child_idx].as_deref(); + let request_child = required_input_ordering[child_idx].as_ref(); let child_plan = plan.children()[child_idx].clone(); match determine_children_requirement(parent_required, request_child, child_plan) { RequirementsCompatibility::Satisfy => Ok(None), @@ -329,7 +329,7 @@ fn try_pushdown_requirements_to_join( } fn expr_source_sides( - required_exprs: &[PhysicalSortExpr], + required_exprs: ExprOrderingRef, join_type: JoinType, left_columns_len: usize, ) -> Option { @@ -377,10 +377,10 @@ fn expr_source_sides( } fn shift_right_required( - parent_required: &[PhysicalSortRequirement], + parent_required: &OrderingRequirement, left_columns_len: usize, -) -> Result> { - let new_right_required: Vec = parent_required +) -> Result { + let new_right_required = parent_required .iter() .filter_map(|r| { r.expr.as_any().downcast_ref::().and_then(|col| { @@ -410,7 +410,7 @@ enum RequirementsCompatibility { /// Requirements satisfy Satisfy, /// Requirements compatible - Compatible(Option>), + Compatible(Option), /// Requirements not compatible NonCompatible, } diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 2fa833bb7e9e0..b0c3d970c7eb1 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -31,7 +31,7 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::tree_node::Transformed; use datafusion_physical_expr::utils::ordering_satisfy; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::ExprOrdering; use std::sync::Arc; /// Convenience rule for writing optimizers: recursively invoke @@ -60,7 +60,7 @@ pub fn optimize_children( /// given ordering requirements while preserving the original partitioning. pub fn add_sort_above( node: &mut Arc, - sort_expr: Vec, + sort_expr: ExprOrdering, ) -> Result<()> { // If the ordering requirement is already satisfied, do not add a sort. if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || { diff --git a/datafusion/core/src/physical_plan/aggregates/mod.rs b/datafusion/core/src/physical_plan/aggregates/mod.rs index c41cc438c8987..af73fecc3d11c 100644 --- a/datafusion/core/src/physical_plan/aggregates/mod.rs +++ b/datafusion/core/src/physical_plan/aggregates/mod.rs @@ -33,7 +33,7 @@ use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Accumulator; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::{ - expressions, AggregateExpr, PhysicalExpr, PhysicalSortExpr, + expressions, AggregateExpr, ExprOrderingRef, PhysicalExpr, }; use std::any::Any; use std::collections::HashMap; @@ -354,7 +354,7 @@ impl ExecutionPlan for AggregateExec { } } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } @@ -725,7 +725,7 @@ mod tests { use arrow::record_batch::RecordBatch; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_physical_expr::expressions::{lit, ApproxDistinct, Count, Median}; - use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; + use datafusion_physical_expr::{AggregateExpr, ExprOrderingRef, PhysicalExpr}; use futures::{FutureExt, Stream}; use std::any::Any; use std::sync::Arc; @@ -991,7 +991,7 @@ mod tests { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 2e12251c2f9ba..c048f34bca2ee 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -30,7 +30,7 @@ use crate::{ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; use crate::execution::context::TaskContext; @@ -95,7 +95,7 @@ impl ExecutionPlan for AnalyzeExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/coalesce_batches.rs b/datafusion/core/src/physical_plan/coalesce_batches.rs index ec7dd7b4d63a2..0f65775a65374 100644 --- a/datafusion/core/src/physical_plan/coalesce_batches.rs +++ b/datafusion/core/src/physical_plan/coalesce_batches.rs @@ -36,7 +36,7 @@ use arrow::record_batch::RecordBatch; use futures::stream::{Stream, StreamExt}; use log::trace; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::metrics::{BaselineMetrics, MetricsSet}; use super::{metrics::ExecutionPlanMetricsSet, Statistics}; @@ -102,7 +102,7 @@ impl ExecutionPlan for CoalesceBatchesExec { Ok(children[0]) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { // The coalesce batches operator does not make any changes to the sorting of its input self.input.output_ordering() } diff --git a/datafusion/core/src/physical_plan/coalesce_partitions.rs b/datafusion/core/src/physical_plan/coalesce_partitions.rs index 8ff8a37aa36ac..9187958bc624e 100644 --- a/datafusion/core/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/core/src/physical_plan/coalesce_partitions.rs @@ -29,7 +29,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use super::common::AbortOnDropMany; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{RecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; @@ -91,7 +91,7 @@ impl ExecutionPlan for CoalescePartitionsExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 7fb67d758f3da..709cbd09769e2 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -26,7 +26,7 @@ use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statist use arrow::datatypes::{Schema, SchemaRef}; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::ExprOrderingRef; use futures::{Future, Stream, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; @@ -285,7 +285,7 @@ pub fn transpose(original: Vec>) -> Vec> { /// orderings, see . pub fn get_meet_of_orderings( given: &[Arc], -) -> Option<&[PhysicalSortExpr]> { +) -> Option { given .iter() .map(|item| item.output_ordering()) @@ -294,8 +294,8 @@ pub fn get_meet_of_orderings( } fn get_meet_of_orderings_helper( - orderings: Vec<&[PhysicalSortExpr]>, -) -> Option<&[PhysicalSortExpr]> { + orderings: Vec, +) -> Option { let mut idx = 0; let first = orderings[0]; loop { @@ -328,10 +328,11 @@ mod tests { record_batch::RecordBatch, }; use datafusion_physical_expr::expressions::{col, Column}; + use datafusion_physical_expr::{ExprOrdering, PhysicalSortExpr}; #[test] fn get_meet_of_orderings_helper_common_prefix_test() -> Result<()> { - let input1: Vec = vec![ + let input1: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -346,7 +347,7 @@ mod tests { }, ]; - let input2: Vec = vec![ + let input2: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -361,7 +362,7 @@ mod tests { }, ]; - let input3: Vec = vec![ + let input3: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -388,7 +389,7 @@ mod tests { #[test] fn get_meet_of_orderings_helper_subset_test() -> Result<()> { - let input1: Vec = vec![ + let input1: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -399,7 +400,7 @@ mod tests { }, ]; - let input2: Vec = vec![ + let input2: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -414,7 +415,7 @@ mod tests { }, ]; - let input3: Vec = vec![ + let input3: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -436,7 +437,7 @@ mod tests { #[test] fn get_meet_of_orderings_helper_no_overlap_test() -> Result<()> { - let input1: Vec = vec![ + let input1: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -447,7 +448,7 @@ mod tests { }, ]; - let input2: Vec = vec![ + let input2: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("x", 0)), options: SortOptions::default(), @@ -458,7 +459,7 @@ mod tests { }, ]; - let input3: Vec = vec![ + let input3: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), diff --git a/datafusion/core/src/physical_plan/empty.rs b/datafusion/core/src/physical_plan/empty.rs index 18a712b6cf427..b919294e318d1 100644 --- a/datafusion/core/src/physical_plan/empty.rs +++ b/datafusion/core/src/physical_plan/empty.rs @@ -29,7 +29,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use log::debug; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::{common, SendableRecordBatchStream, Statistics}; use crate::execution::context::TaskContext; @@ -113,7 +113,7 @@ impl ExecutionPlan for EmptyExec { Partitioning::UnknownPartitioning(self.partitions) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index 93fcfe45dadf9..1b43e06dd858b 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -31,7 +31,7 @@ use crate::{ use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use log::debug; -use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream}; +use super::{expressions::ExprOrderingRef, SendableRecordBatchStream}; use crate::execution::context::TaskContext; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; @@ -93,7 +93,7 @@ impl ExecutionPlan for ExplainExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index ed27dfac0317f..895cdd2379f58 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -17,7 +17,7 @@ //! Execution plan for reading line-delimited Avro files use crate::error::Result; -use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::expressions::ExprOrderingRef; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -76,7 +76,7 @@ impl ExecutionPlan for AvroExec { Ok(self.base_config().infinite_source) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { get_output_ordering(&self.base_config) } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index d9075c84adfca..84f599d675d55 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -20,7 +20,7 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; -use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::expressions::ExprOrderingRef; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -117,7 +117,7 @@ impl ExecutionPlan for CsvExec { } /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { get_output_ordering(&self.base_config) } diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index ebbae7417889f..7ffcd771444b8 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -19,7 +19,7 @@ use crate::datasource::file_format::file_type::FileCompressionType; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; -use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::expressions::ExprOrderingRef; use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; @@ -97,7 +97,7 @@ impl ExecutionPlan for NdJsonExec { Ok(self.base_config.infinite_source) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { get_output_ordering(&self.base_config) } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index fdf34e75de8d0..614ec1763201e 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -36,7 +36,7 @@ use arrow::{ record_batch::RecordBatch, }; pub use avro::AvroExec; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::{ExprOrdering, ExprOrderingRef}; pub use file_stream::{FileOpenFuture, FileOpener, FileStream}; pub(crate) use json::plan_to_json; pub use json::NdJsonExec; @@ -149,7 +149,7 @@ pub struct FileScanConfig { /// The partitioning columns pub table_partition_cols: Vec<(String, DataType)>, /// The order in which the data is sorted, if known. - pub output_ordering: Option>, + pub output_ordering: Option, /// Indicates whether this plan may produce an infinite stream of records. pub infinite_source: bool, } @@ -704,7 +704,7 @@ impl From for FileMeta { ///``` pub(crate) fn get_output_ordering( base_config: &FileScanConfig, -) -> Option<&[PhysicalSortExpr]> { +) -> Option { base_config.output_ordering.as_ref() .map(|output_ordering| if base_config.file_groups.iter().any(|group| group.len() > 1) { debug!("Skipping specified output ordering {:?}. Some file group had more than one file: {:?}", diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 92be32f47649c..e5034f513959a 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -39,7 +39,7 @@ use crate::{ execution::context::TaskContext, physical_optimizer::pruning::PruningPredicate, physical_plan::{ - expressions::PhysicalSortExpr, + expressions::ExprOrderingRef, file_format::{FileScanConfig, SchemaAdapter}, metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, @@ -342,7 +342,7 @@ impl ExecutionPlan for ParquetExec { Partitioning::UnknownPartitioning(self.base_config.file_groups.len()) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { get_output_ordering(&self.base_config) } @@ -448,7 +448,7 @@ impl ExecutionPlan for ParquetExec { } } -fn make_output_ordering_string(ordering: &[PhysicalSortExpr]) -> String { +fn make_output_ordering_string(ordering: ExprOrderingRef) -> String { use std::fmt::Write; let mut w: String = ", output_ordering=[".into(); diff --git a/datafusion/core/src/physical_plan/filter.rs b/datafusion/core/src/physical_plan/filter.rs index a72aa69d07137..e114f967aeb52 100644 --- a/datafusion/core/src/physical_plan/filter.rs +++ b/datafusion/core/src/physical_plan/filter.rs @@ -23,7 +23,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::{ColumnStatistics, RecordBatchStream, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ @@ -113,7 +113,7 @@ impl ExecutionPlan for FilterExec { Ok(children[0]) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input.output_ordering() } diff --git a/datafusion/core/src/physical_plan/joins/cross_join.rs b/datafusion/core/src/physical_plan/joins/cross_join.rs index 8492e5e6b20d5..67e005d3c193d 100644 --- a/datafusion/core/src/physical_plan/joins/cross_join.rs +++ b/datafusion/core/src/physical_plan/joins/cross_join.rs @@ -30,8 +30,8 @@ use crate::execution::memory_pool::{SharedOptionalMemoryReservation, TryGrow}; use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, - ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties, - ExecutionPlan, Partitioning, PhysicalSortExpr, RecordBatchStream, + expressions::ExprOrderingRef, ColumnStatistics, DisplayFormatType, Distribution, + EquivalenceProperties, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::{error::Result, scalar::ScalarValue}; @@ -200,7 +200,7 @@ impl ExecutionPlan for CrossJoinExec { } // TODO check the output ordering of CrossJoin - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/joins/hash_join.rs b/datafusion/core/src/physical_plan/joins/hash_join.rs index 39acffa203acc..fe53036e97f61 100644 --- a/datafusion/core/src/physical_plan/joins/hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/hash_join.rs @@ -58,8 +58,7 @@ use hashbrown::raw::RawTable; use crate::physical_plan::{ coalesce_batches::concat_batches, coalesce_partitions::CoalescePartitionsExec, - expressions::Column, - expressions::PhysicalSortExpr, + expressions::{Column, ExprOrderingRef}, hash_utils::create_hashes, joins::utils::{ adjust_right_output_partitioning, build_join_schema, check_join_is_valid, @@ -335,7 +334,7 @@ impl ExecutionPlan for HashJoinExec { // TODO Output ordering might be kept for some cases. // For example if it is inner join then the stream side order can be kept - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs index e04e86d0d3d70..bfe54e5fadd80 100644 --- a/datafusion/core/src/physical_plan/joins/nested_loop_join.rs +++ b/datafusion/core/src/physical_plan/joins/nested_loop_join.rs @@ -40,7 +40,7 @@ use arrow::record_batch::RecordBatch; use arrow::util::bit_util; use datafusion_common::{DataFusionError, Statistics}; use datafusion_expr::JoinType; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortExpr}; +use datafusion_physical_expr::{EquivalenceProperties, ExprOrderingRef}; use futures::{ready, Stream, StreamExt, TryStreamExt}; use std::any::Any; use std::fmt::Formatter; @@ -157,7 +157,7 @@ impl ExecutionPlan for NestedLoopJoinExec { } } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { // no specified order for the output None } diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs index 94f5c9e5ef60f..2f71dd47e2d51 100644 --- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs @@ -35,9 +35,7 @@ use arrow::compute::{concat_batches, take, SortOptions}; use arrow::datatypes::{DataType, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, PhysicalSortRequirement, -}; +use datafusion_physical_expr::{make_requirements_from_ordering, OrderingRequirement}; use futures::{Stream, StreamExt}; use crate::error::DataFusionError; @@ -45,8 +43,9 @@ use crate::error::Result; use crate::execution::context::TaskContext; use crate::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use crate::logical_expr::JoinType; -use crate::physical_plan::expressions::Column; -use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::expressions::{ + Column, ExprOrdering, ExprOrderingRef, PhysicalSortExpr, +}; use crate::physical_plan::joins::utils::{ build_join_schema, check_join_is_valid, combine_join_equivalence_properties, estimate_join_statistics, partitioned_join_output_partitioning, JoinOn, @@ -76,11 +75,11 @@ pub struct SortMergeJoinExec { /// Execution metrics metrics: ExecutionPlanMetricsSet, /// The left SortExpr - left_sort_exprs: Vec, + left_sort_exprs: ExprOrdering, /// The right SortExpr - right_sort_exprs: Vec, + right_sort_exprs: ExprOrdering, /// The output ordering - output_ordering: Option>, + output_ordering: Option, /// Sort options of join columns used in sorting left and right execution plans pub(crate) sort_options: Vec, /// If null_equals_null is true, null == null else null != null @@ -149,7 +148,7 @@ impl SortMergeJoinExec { right .output_ordering() .map(|sort_exprs| { - let new_sort_exprs: Result> = sort_exprs + let new_sort_exprs: Result = sort_exprs .iter() .map(|e| { let new_expr = @@ -228,10 +227,10 @@ impl ExecutionPlan for SortMergeJoinExec { ] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![ - Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)), - Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)), + Some(make_requirements_from_ordering(&self.left_sort_exprs)), + Some(make_requirements_from_ordering(&self.right_sort_exprs)), ] } @@ -245,7 +244,7 @@ impl ExecutionPlan for SortMergeJoinExec { ) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.output_ordering.as_deref() } diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs index dafd0bfd49401..916ffc031844b 100644 --- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs +++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs @@ -53,8 +53,7 @@ use crate::execution::context::TaskContext; use crate::logical_expr::JoinType; use crate::physical_plan::joins::hash_join_utils::convert_sort_expr_with_filter_schema; use crate::physical_plan::{ - expressions::Column, - expressions::PhysicalSortExpr, + expressions::{Column, ExprOrderingRef}, joins::{ hash_join::{build_join_indices, update_hash, JoinHashMap}, hash_join_utils::{build_filter_input_order, SortedFilterExpr}, @@ -411,7 +410,7 @@ impl ExecutionPlan for SymmetricHashJoinExec { } // TODO: Output ordering might be kept for some cases. - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } @@ -1521,7 +1520,7 @@ mod tests { use datafusion_expr::Operator; use datafusion_physical_expr::expressions::{binary, col, Column}; use datafusion_physical_expr::intervals::test_utils::gen_conjunctive_numeric_expr; - use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::{ExprOrdering, PhysicalExpr, PhysicalSortExpr}; use crate::physical_plan::joins::{ hash_join_utils::tests::complicated_filter, HashJoinExec, PartitionMode, @@ -1823,8 +1822,8 @@ mod tests { fn create_memory_table( left_batch: RecordBatch, right_batch: RecordBatch, - left_sorted: Option>, - right_sorted: Option>, + left_sorted: Option, + right_sorted: Option, batch_size: usize, ) -> Result<(Arc, Arc)> { let mut left = MemoryExec::try_new( diff --git a/datafusion/core/src/physical_plan/limit.rs b/datafusion/core/src/physical_plan/limit.rs index bfeb9c65b9d5c..6a5d9b36d21e6 100644 --- a/datafusion/core/src/physical_plan/limit.rs +++ b/datafusion/core/src/physical_plan/limit.rs @@ -33,7 +33,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -112,7 +112,7 @@ impl ExecutionPlan for GlobalLimitExec { false } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input.output_ordering() } @@ -288,7 +288,7 @@ impl ExecutionPlan for LocalLimitExec { } // Local limit will not change the input plan's ordering - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input.output_ordering() } diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs index f0cd48fa4f9db..45eff33ae4e5d 100644 --- a/datafusion/core/src/physical_plan/memory.rs +++ b/datafusion/core/src/physical_plan/memory.rs @@ -22,7 +22,7 @@ use std::any::Any; use std::sync::Arc; use std::task::{Context, Poll}; -use super::expressions::PhysicalSortExpr; +use super::expressions::{ExprOrdering, ExprOrderingRef}; use super::{ common, project_schema, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -46,7 +46,7 @@ pub struct MemoryExec { /// Optional projection projection: Option>, // Optional sort information - sort_information: Option>, + sort_information: Option, } impl fmt::Debug for MemoryExec { @@ -78,7 +78,7 @@ impl ExecutionPlan for MemoryExec { Partitioning::UnknownPartitioning(self.partitions.len()) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.sort_information.as_deref() } @@ -151,10 +151,7 @@ impl MemoryExec { } /// Set sort information - pub fn with_sort_information( - mut self, - sort_information: Vec, - ) -> Self { + pub fn with_sort_information(mut self, sort_information: ExprOrdering) -> Self { self.sort_information = Some(sort_information); self } diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 9815d9491e021..19242da5b2bc1 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -24,7 +24,6 @@ use self::{ }; pub use crate::common::{ColumnStatistics, Statistics}; use crate::error::Result; -use crate::physical_plan::expressions::PhysicalSortExpr; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -127,7 +126,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// /// It is safe to return `None` here if your operator does not /// have any particular output order here - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]>; + fn output_ordering(&self) -> Option; /// Specifies the data distribution requirements for all the /// children for this operator, By default it's [[Distribution::UnspecifiedDistribution]] for each child, @@ -142,7 +141,7 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// NOTE that checking `!is_empty()` does **not** check for a /// required input ordering. Instead, the correct check is that at /// least one entry must be `Some` - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { vec![None; self.children().len()] } @@ -589,13 +588,15 @@ impl Distribution { } } -use datafusion_physical_expr::expressions::Column; pub use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::{ - expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, + expr_list_eq_strict_order, expressions::Column, + normalize_expr_with_equivalence_properties, EquivalenceProperties, + OrderingRequirement, +}; +pub use datafusion_physical_expr::{ + AggregateExpr, ExprOrdering, ExprOrderingRef, PhysicalExpr, }; -pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr}; -use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement}; /// Applies an optional projection to a [`SchemaRef`], returning the /// projected schema diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 25afdb4d1ce2e..11be96e351327 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1856,6 +1856,7 @@ mod tests { col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder, UserDefinedLogicalNodeCore, }; + use datafusion_physical_expr::ExprOrderingRef; use fmt::Debug; use std::collections::HashMap; use std::convert::TryFrom; @@ -2422,7 +2423,7 @@ Internal error: Optimizer rule 'type_coercion' failed due to unexpected error: E Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/projection.rs b/datafusion/core/src/physical_plan/projection.rs index 95d33fbcd3980..84e080014acee 100644 --- a/datafusion/core/src/physical_plan/projection.rs +++ b/datafusion/core/src/physical_plan/projection.rs @@ -29,7 +29,7 @@ use std::task::{Context, Poll}; use crate::error::Result; use crate::physical_plan::{ ColumnStatistics, DisplayFormatType, EquivalenceProperties, ExecutionPlan, - Partitioning, PhysicalExpr, + ExprOrdering, ExprOrderingRef, Partitioning, PhysicalExpr, }; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; @@ -54,7 +54,7 @@ pub struct ProjectionExec { /// The input plan input: Arc, /// The output ordering - output_ordering: Option>, + output_ordering: Option, /// The alias map used to normalize out expressions like Partitioning and PhysicalSortExpr /// The key is the column from the input schema and the values are the columns from the output schema alias_map: HashMap>, @@ -191,7 +191,7 @@ impl ExecutionPlan for ProjectionExec { } } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.output_ordering.as_deref() } diff --git a/datafusion/core/src/physical_plan/repartition/mod.rs b/datafusion/core/src/physical_plan/repartition/mod.rs index 7f13418d26f70..780f83fd27814 100644 --- a/datafusion/core/src/physical_plan/repartition/mod.rs +++ b/datafusion/core/src/physical_plan/repartition/mod.rs @@ -38,7 +38,7 @@ use log::debug; use self::distributor_channels::{DistributionReceiver, DistributionSender}; use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation}; -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{RecordBatchStream, SendableRecordBatchStream}; @@ -328,7 +328,7 @@ impl ExecutionPlan for RepartitionExec { self.partitioning.clone() } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { if self.maintains_input_order()[0] { self.input().output_ordering() } else { diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index c3fc06206ca15..92b49ea7fb91e 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -26,7 +26,6 @@ use crate::execution::memory_pool::{ }; use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; -use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, }; @@ -45,7 +44,7 @@ use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; -use datafusion_physical_expr::EquivalenceProperties; +use datafusion_physical_expr::{EquivalenceProperties, ExprOrdering, ExprOrderingRef}; use futures::{Stream, StreamExt, TryStreamExt}; use log::{debug, error}; use std::any::Any; @@ -75,7 +74,7 @@ struct ExternalSorter { in_mem_batches: Vec, spills: Vec, /// Sort expressions - expr: Vec, + expr: ExprOrdering, session_config: Arc, runtime: Arc, metrics_set: CompositeMetricsSet, @@ -89,7 +88,7 @@ impl ExternalSorter { pub fn new( partition_id: usize, schema: SchemaRef, - expr: Vec, + expr: ExprOrdering, metrics_set: CompositeMetricsSet, session_config: Arc, runtime: Arc, @@ -278,7 +277,7 @@ impl Debug for ExternalSorter { fn in_mem_partial_sort( buffered_batches: &mut Vec, schema: SchemaRef, - expressions: &[PhysicalSortExpr], + expressions: ExprOrderingRef, batch_size: usize, tracking_metrics: MemTrackingMetrics, fetch: Option, @@ -328,7 +327,7 @@ struct CompositeIndex { /// Get sorted iterator by sort concatenated `SortColumn`s fn get_sorted_iter( sort_arrays: &[Vec], - expr: &[PhysicalSortExpr], + expr: ExprOrderingRef, batch_size: usize, fetch: Option, ) -> Result { @@ -625,7 +624,7 @@ pub struct SortExec { /// Input schema pub(crate) input: Arc, /// Sort expressions - expr: Vec, + expr: ExprOrdering, /// Containing all metrics set created during sort metrics_set: CompositeMetricsSet, /// Preserve partitions of input plan @@ -637,7 +636,7 @@ pub struct SortExec { impl SortExec { /// Create a new sort execution plan pub fn try_new( - expr: Vec, + expr: ExprOrdering, input: Arc, fetch: Option, ) -> Result { @@ -652,7 +651,7 @@ impl SortExec { /// Create a new sort execution plan with the option to preserve /// the partitioning of the input plan pub fn new_with_partitioning( - expr: Vec, + expr: ExprOrdering, input: Arc, preserve_partitioning: bool, fetch: Option, @@ -672,7 +671,7 @@ impl SortExec { } /// Sort expressions - pub fn expr(&self) -> &[PhysicalSortExpr] { + pub fn expr(&self) -> ExprOrderingRef { &self.expr } @@ -731,7 +730,7 @@ impl ExecutionPlan for SortExec { false } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { Some(&self.expr) } @@ -816,7 +815,7 @@ struct BatchWithSortArray { fn sort_batch( batch: RecordBatch, schema: SchemaRef, - expr: &[PhysicalSortExpr], + expr: ExprOrderingRef, fetch: Option, ) -> Result { let sort_columns = expr @@ -868,7 +867,7 @@ fn sort_batch( async fn do_sort( mut input: SendableRecordBatchStream, partition_id: usize, - expr: Vec, + expr: ExprOrdering, metrics_set: CompositeMetricsSet, context: Arc, fetch: Option, @@ -912,7 +911,7 @@ mod tests { use crate::execution::runtime_env::RuntimeConfig; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::collect; - use crate::physical_plan::expressions::col; + use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::memory::MemoryExec; use crate::prelude::SessionContext; use crate::test; diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 14204ef3b4b55..bcdf125e38f3c 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -42,12 +42,13 @@ use crate::physical_plan::metrics::{ use crate::physical_plan::sorts::{RowIndex, SortKeyCursor, SortedStream}; use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common::spawn_execution, expressions::PhysicalSortExpr, DisplayFormatType, - Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream, - SendableRecordBatchStream, Statistics, + common::spawn_execution, + expressions::{ExprOrdering, ExprOrderingRef}, + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use datafusion_physical_expr::{ - make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement, + make_requirements_from_ordering, EquivalenceProperties, OrderingRequirement, }; /// Sort preserving merge execution plan @@ -82,14 +83,14 @@ pub struct SortPreservingMergeExec { /// Input plan input: Arc, /// Sort expressions - expr: Vec, + expr: ExprOrdering, /// Execution metrics metrics: ExecutionPlanMetricsSet, } impl SortPreservingMergeExec { /// Create a new sort execution plan - pub fn new(expr: Vec, input: Arc) -> Self { + pub fn new(expr: ExprOrdering, input: Arc) -> Self { Self { input, expr, @@ -103,7 +104,7 @@ impl SortPreservingMergeExec { } /// Sort expressions - pub fn expr(&self) -> &[PhysicalSortExpr] { + pub fn expr(&self) -> ExprOrderingRef { &self.expr } } @@ -127,11 +128,11 @@ impl ExecutionPlan for SortPreservingMergeExec { vec![Distribution::UnspecifiedDistribution] } - fn required_input_ordering(&self) -> Vec>> { - vec![Some(make_sort_requirements_from_exprs(&self.expr))] + fn required_input_ordering(&self) -> Vec> { + vec![Some(make_requirements_from_ordering(&self.expr))] } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input.output_ordering() } @@ -352,7 +353,7 @@ impl SortPreservingMergeStream { pub(crate) fn new_from_streams( streams: Vec, schema: SchemaRef, - expressions: &[PhysicalSortExpr], + expressions: ExprOrderingRef, mut tracking_metrics: MemTrackingMetrics, batch_size: usize, ) -> Result { @@ -716,7 +717,7 @@ mod tests { use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; use crate::from_slice::FromSlice; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::physical_plan::expressions::col; + use crate::physical_plan::expressions::{col, PhysicalSortExpr}; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::metrics::MetricValue; use crate::physical_plan::sorts::sort::SortExec; @@ -966,7 +967,7 @@ mod tests { async fn sorted_merge( input: Arc, - sort: Vec, + sort: ExprOrdering, context: Arc, ) -> RecordBatch { let merge = Arc::new(SortPreservingMergeExec::new(sort, input)); @@ -977,7 +978,7 @@ mod tests { async fn partition_sort( input: Arc, - sort: Vec, + sort: ExprOrdering, context: Arc, ) -> RecordBatch { let sort_exec = Arc::new(SortExec::new_with_partitioning( @@ -991,7 +992,7 @@ mod tests { async fn basic_sort( src: Arc, - sort: Vec, + sort: ExprOrdering, context: Arc, ) -> RecordBatch { let merge = Arc::new(CoalescePartitionsExec::new(src)); @@ -1071,7 +1072,7 @@ mod tests { } async fn sorted_partitioned_input( - sort: Vec, + sort: ExprOrdering, sizes: &[usize], context: Arc, ) -> Arc { diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs index efd43aca6b154..1c0b31810963e 100644 --- a/datafusion/core/src/physical_plan/streaming.rs +++ b/datafusion/core/src/physical_plan/streaming.rs @@ -25,7 +25,7 @@ use async_trait::async_trait; use futures::stream::StreamExt; use datafusion_common::{DataFusionError, Result, Statistics}; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::ExprOrderingRef; use crate::datasource::streaming::PartitionStream; use crate::execution::context::TaskContext; @@ -85,7 +85,7 @@ impl ExecutionPlan for StreamingTableExec { Partitioning::UnknownPartitioning(self.partitions.len()) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index 3d4522272a521..c9152149d6d62 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -36,7 +36,7 @@ use log::debug; use log::warn; use super::{ - expressions::PhysicalSortExpr, + expressions::ExprOrderingRef, metrics::{ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -218,7 +218,7 @@ impl ExecutionPlan for UnionExec { } } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { // If the Union is partition aware, there is no output ordering. // Otherwise, the output ordering is the "meet" of its input orderings. // The meet is the finest ordering that satisfied by all the input diff --git a/datafusion/core/src/physical_plan/unnest.rs b/datafusion/core/src/physical_plan/unnest.rs index 0d60273c30101..73bd34a9f6bf8 100644 --- a/datafusion/core/src/physical_plan/unnest.rs +++ b/datafusion/core/src/physical_plan/unnest.rs @@ -33,8 +33,8 @@ use std::{any::Any, sync::Arc}; use crate::execution::context::TaskContext; use crate::physical_plan::{ coalesce_batches::concat_batches, expressions::Column, DisplayFormatType, - Distribution, EquivalenceProperties, ExecutionPlan, Partitioning, PhysicalExpr, - PhysicalSortExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, + Distribution, EquivalenceProperties, ExecutionPlan, ExprOrderingRef, Partitioning, + PhysicalExpr, RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::{ error::{DataFusionError, Result}, @@ -102,7 +102,7 @@ impl ExecutionPlan for UnnestExec { self.input.output_partitioning() } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/values.rs b/datafusion/core/src/physical_plan/values.rs index 2ac2dd1ae060d..38d24906eeddc 100644 --- a/datafusion/core/src/physical_plan/values.rs +++ b/datafusion/core/src/physical_plan/values.rs @@ -17,7 +17,7 @@ //! Values execution plan -use super::expressions::PhysicalSortExpr; +use super::expressions::ExprOrderingRef; use super::{common, SendableRecordBatchStream, Statistics}; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; @@ -112,7 +112,7 @@ impl ExecutionPlan for ValuesExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs index 4b01d3b4ed386..2d1de889a578a 100644 --- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs @@ -22,7 +22,6 @@ use crate::error::Result; use crate::execution::context::TaskContext; -use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; @@ -56,7 +55,8 @@ use datafusion_physical_expr::window::{ WindowAggState, WindowState, }; use datafusion_physical_expr::{ - EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement, + EquivalenceProperties, ExprOrdering, ExprOrderingRef, OrderingRequirement, + PhysicalExpr, }; use indexmap::IndexMap; use log::debug; @@ -118,7 +118,7 @@ impl BoundedWindowAggExec { // We are sure that partition by columns are always at the beginning of sort_keys // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` columns can be used safely // to calculate partition separation points - pub fn partition_by_sort_keys(&self) -> Result> { + pub fn partition_by_sort_keys(&self) -> Result { let mut result = vec![]; // All window exprs have the same partition by, so we just use the first one: let partition_by = self.window_expr()[0].partition_by(); @@ -162,11 +162,11 @@ impl ExecutionPlan for BoundedWindowAggExec { Ok(children[0]) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input().output_ordering() } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); let requirements = calc_requirements(partition_bys, order_keys); @@ -319,7 +319,7 @@ pub struct SortedPartitionByBoundedWindowStream { window_agg_states: Vec, finished: bool, window_expr: Vec>, - partition_by_sort_keys: Vec, + partition_by_sort_keys: ExprOrdering, baseline_metrics: BaselineMetrics, } @@ -430,7 +430,7 @@ impl SortedPartitionByBoundedWindowStream { window_expr: Vec>, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, - partition_by_sort_keys: Vec, + partition_by_sort_keys: ExprOrdering, ) -> Self { let state = window_expr.iter().map(|_| IndexMap::new()).collect(); let empty_batch = RecordBatch::new_empty(schema.clone()); diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs index f7f9bb76b3f44..0a7e5b2c18575 100644 --- a/datafusion/core/src/physical_plan/windows/mod.rs +++ b/datafusion/core/src/physical_plan/windows/mod.rs @@ -46,7 +46,9 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec; pub use datafusion_physical_expr::window::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr, }; -use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_expr::{ + ExprOrderingRef, OrderingRequirement, PhysicalSortRequirement, +}; pub use window_agg_exec::WindowAggExec; /// Create a physical expression for window function @@ -55,7 +57,7 @@ pub fn create_window_expr( name: String, args: &[Arc], partition_by: &[Arc], - order_by: &[PhysicalSortExpr], + order_by: ExprOrderingRef, window_frame: Arc, input_schema: &Schema, ) -> Result> { @@ -190,8 +192,8 @@ fn create_built_in_window_expr( pub(crate) fn calc_requirements( partition_by_exprs: &[Arc], - orderby_sort_exprs: &[PhysicalSortExpr], -) -> Option> { + orderby_sort_exprs: ExprOrderingRef, +) -> Option { let mut sort_reqs = vec![]; for partition_by in partition_by_exprs { sort_reqs.push(PhysicalSortRequirement { @@ -290,7 +292,7 @@ mod tests { orderbys.push(PhysicalSortExpr { expr, options }); } - let mut expected: Option> = None; + let mut expected: Option = None; for (col_name, reqs) in expected_params { let options = reqs.map(|(descending, nulls_first)| SortOptions { descending, diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs index 75598f1d5266f..037f2e4cf1030 100644 --- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs +++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs @@ -20,7 +20,6 @@ use crate::error::Result; use crate::execution::context::TaskContext; use crate::physical_plan::common::transpose; -use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::metrics::{ BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, }; @@ -40,7 +39,7 @@ use arrow::{ record_batch::RecordBatch, }; use datafusion_common::DataFusionError; -use datafusion_physical_expr::PhysicalSortRequirement; +use datafusion_physical_expr::{ExprOrdering, ExprOrderingRef, OrderingRequirement}; use futures::stream::Stream; use futures::{ready, StreamExt}; use std::any::Any; @@ -107,7 +106,7 @@ impl WindowAggExec { // We are sure that partition by columns are always at the beginning of sort_keys // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` columns can be used safely // to calculate partition separation points - pub fn partition_by_sort_keys(&self) -> Result> { + pub fn partition_by_sort_keys(&self) -> Result { let mut result = vec![]; // All window exprs have the same partition by, so we just use the first one: let partition_by = self.window_expr()[0].partition_by(); @@ -161,7 +160,7 @@ impl ExecutionPlan for WindowAggExec { } } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.input().output_ordering() } @@ -169,7 +168,7 @@ impl ExecutionPlan for WindowAggExec { vec![true] } - fn required_input_ordering(&self) -> Vec>> { + fn required_input_ordering(&self) -> Vec> { let partition_bys = self.window_expr()[0].partition_by(); let order_keys = self.window_expr()[0].order_by(); let requirements = calc_requirements(partition_bys, order_keys); @@ -298,7 +297,7 @@ pub struct WindowAggStream { batches: Vec, finished: bool, window_expr: Vec>, - partition_by_sort_keys: Vec, + partition_by_sort_keys: ExprOrdering, baseline_metrics: BaselineMetrics, } @@ -309,7 +308,7 @@ impl WindowAggStream { window_expr: Vec>, input: SendableRecordBatchStream, baseline_metrics: BaselineMetrics, - partition_by_sort_keys: Vec, + partition_by_sort_keys: ExprOrdering, ) -> Self { Self { schema, diff --git a/datafusion/core/src/scheduler/pipeline/execution.rs b/datafusion/core/src/scheduler/pipeline/execution.rs index ea1643867bb84..454ebdbf4ccf4 100644 --- a/datafusion/core/src/scheduler/pipeline/execution.rs +++ b/datafusion/core/src/scheduler/pipeline/execution.rs @@ -29,7 +29,7 @@ use crate::arrow::datatypes::SchemaRef; use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::TaskContext; -use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::expressions::ExprOrderingRef; use crate::physical_plan::metrics::MetricsSet; use crate::physical_plan::{ displayable, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, @@ -231,7 +231,7 @@ impl ExecutionPlan for ProxyExecutionPlan { self.inner.output_partitioning() } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { self.inner.output_ordering() } diff --git a/datafusion/core/src/test/exec.rs b/datafusion/core/src/test/exec.rs index bce7d08a5c563..445934df2eabd 100644 --- a/datafusion/core/src/test/exec.rs +++ b/datafusion/core/src/test/exec.rs @@ -32,7 +32,7 @@ use arrow::{ use futures::Stream; use crate::execution::context::TaskContext; -use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::expressions::ExprOrderingRef; use crate::physical_plan::{ common, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -146,7 +146,7 @@ impl ExecutionPlan for MockExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } @@ -284,7 +284,7 @@ impl ExecutionPlan for BarrierExec { Partitioning::UnknownPartitioning(self.data.len()) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } @@ -384,7 +384,7 @@ impl ExecutionPlan for ErrorExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } @@ -462,7 +462,7 @@ impl ExecutionPlan for StatisticsExec { Partitioning::UnknownPartitioning(2) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } @@ -560,7 +560,7 @@ impl ExecutionPlan for BlockingExec { Partitioning::UnknownPartitioning(self.n_partitions) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 982a7a83002e0..e2626983269cb 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -41,7 +41,7 @@ use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion_common::{Statistics, TableReference}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; -use datafusion_physical_expr::PhysicalSortExpr; +use datafusion_physical_expr::ExprOrderingRef; use futures::Stream; /// Compares formatted output of a record batch with an expected @@ -378,7 +378,7 @@ impl ExecutionPlan for UnboundedExec { fn unbounded_output(&self, _children: &[bool]) -> Result { Ok(self.batch_produce.is_none()) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/tests/custom_sources.rs b/datafusion/core/tests/custom_sources.rs index 57314f2e1eb8b..ccc4405ac07b9 100644 --- a/datafusion/core/tests/custom_sources.rs +++ b/datafusion/core/tests/custom_sources.rs @@ -25,7 +25,7 @@ use datafusion::logical_expr::{ col, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, UNNAMED_TABLE, }; use datafusion::physical_plan::empty::EmptyExec; -use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::expressions::ExprOrderingRef; use datafusion::physical_plan::{ project_schema, ColumnStatistics, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -113,7 +113,7 @@ impl ExecutionPlan for CustomExecutionPlan { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs index 36b0789829a2b..44acdc796a7d7 100644 --- a/datafusion/core/tests/provider_filter_pushdown.rs +++ b/datafusion/core/tests/provider_filter_pushdown.rs @@ -24,7 +24,7 @@ use datafusion::error::Result; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; use datafusion::physical_plan::common::SizedRecordBatchStream; -use datafusion::physical_plan::expressions::PhysicalSortExpr; +use datafusion::physical_plan::expressions::ExprOrderingRef; use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -72,7 +72,7 @@ impl ExecutionPlan for CustomPlan { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/tests/statistics.rs b/datafusion/core/tests/statistics.rs index ca83ab1cf64bf..aacb1af25b5dd 100644 --- a/datafusion/core/tests/statistics.rs +++ b/datafusion/core/tests/statistics.rs @@ -25,7 +25,7 @@ use datafusion::{ error::Result, logical_expr::Expr, physical_plan::{ - expressions::PhysicalSortExpr, project_schema, ColumnStatistics, + expressions::ExprOrderingRef, project_schema, ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, @@ -124,7 +124,7 @@ impl ExecutionPlan for StatisticsValidation { Partitioning::UnknownPartitioning(2) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs index 7738a123949f9..55c4037b42319 100644 --- a/datafusion/core/tests/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined_plan.rs @@ -80,7 +80,7 @@ use datafusion::{ }, optimizer::{optimize_children, OptimizerConfig, OptimizerRule}, physical_plan::{ - expressions::PhysicalSortExpr, + expressions::ExprOrderingRef, planner::{DefaultPhysicalPlanner, ExtensionPlanner}, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream, SendableRecordBatchStream, Statistics, @@ -437,7 +437,7 @@ impl ExecutionPlan for TopKExec { Partitioning::UnknownPartitioning(1) } - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + fn output_ordering(&self) -> Option { None } diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 63fb7b7d37ad5..b0c6e7c739732 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -95,7 +95,7 @@ pub use try_cast::{try_cast, TryCastExpr}; pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } -pub use crate::PhysicalSortExpr; +pub use crate::{ExprOrdering, ExprOrderingRef, PhysicalSortExpr}; #[cfg(test)] pub(crate) mod tests { diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index a1698fe072b40..fc12d3a6732d6 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -54,7 +54,8 @@ pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalE pub use planner::create_physical_expr; pub use scalar_function::ScalarFunctionExpr; pub use sort_expr::{ - make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement, + make_requirements_from_ordering, ExprOrdering, ExprOrderingRef, OrderingRequirement, + PhysicalSortExpr, PhysicalSortRequirement, }; pub use utils::{ expr_list_eq_any_order, expr_list_eq_strict_order, diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs index 08bd394e6d117..f081e5d2a90eb 100644 --- a/datafusion/physical-expr/src/sort_expr.rs +++ b/datafusion/physical-expr/src/sort_expr.rs @@ -116,9 +116,11 @@ impl PhysicalSortRequirement { } } -pub fn make_sort_requirements_from_exprs( - ordering: &[PhysicalSortExpr], -) -> Vec { +pub type ExprOrdering = Vec; +pub type ExprOrderingRef<'a> = &'a [PhysicalSortExpr]; +pub type OrderingRequirement = Vec; + +pub fn make_requirements_from_ordering(ordering: ExprOrderingRef) -> OrderingRequirement { ordering.iter().map(|e| e.clone().into()).collect() } diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs index 5c30d523d8a1e..31a6447047f4b 100644 --- a/datafusion/physical-expr/src/utils.rs +++ b/datafusion/physical-expr/src/utils.rs @@ -18,7 +18,8 @@ use crate::equivalence::EquivalentClass; use crate::expressions::{BinaryExpr, Column, UnKnownColumn}; use crate::{ - EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, + EquivalenceProperties, ExprOrdering, ExprOrderingRef, OrderingRequirement, + PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, }; use arrow::datatypes::SchemaRef; use datafusion_common::Result; @@ -73,8 +74,8 @@ pub fn expr_list_eq_strict_order( /// SortExpr('a','b','c') != SortExpr('c','b','a') #[allow(dead_code)] pub fn sort_expr_list_eq_strict_order( - list1: &[PhysicalSortExpr], - list2: &[PhysicalSortExpr], + list1: ExprOrderingRef, + list2: ExprOrderingRef, ) -> bool { list1.len() == list2.len() && list1.iter().zip(list2.iter()).all(|(e1, e2)| e1.eq(e2)) } @@ -213,8 +214,8 @@ pub fn normalize_sort_requirement_with_equivalence_properties( /// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s. pub fn ordering_satisfy EquivalenceProperties>( - provided: Option<&[PhysicalSortExpr]>, - required: Option<&[PhysicalSortExpr]>, + provided: Option, + required: Option, equal_properties: F, ) -> bool { match (provided, required) { @@ -229,8 +230,8 @@ pub fn ordering_satisfy EquivalenceProperties>( /// Checks whether the required [`PhysicalSortExpr`]s are satisfied by the /// provided [`PhysicalSortExpr`]s. fn ordering_satisfy_concrete EquivalenceProperties>( - provided: &[PhysicalSortExpr], - required: &[PhysicalSortExpr], + provided: ExprOrderingRef, + required: ExprOrderingRef, equal_properties: F, ) -> bool { if required.len() > provided.len() { @@ -259,8 +260,8 @@ fn ordering_satisfy_concrete EquivalenceProperties>( /// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the /// provided [`PhysicalSortExpr`]s. pub fn ordering_satisfy_requirement EquivalenceProperties>( - provided: Option<&[PhysicalSortExpr]>, - required: Option<&[PhysicalSortRequirement]>, + provided: Option, + required: Option<&OrderingRequirement>, equal_properties: F, ) -> bool { match (provided, required) { @@ -275,8 +276,8 @@ pub fn ordering_satisfy_requirement EquivalenceProperties>( /// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the /// provided [`PhysicalSortExpr`]s. pub fn ordering_satisfy_requirement_concrete EquivalenceProperties>( - provided: &[PhysicalSortExpr], - required: &[PhysicalSortRequirement], + provided: ExprOrderingRef, + required: &OrderingRequirement, equal_properties: F, ) -> bool { if required.len() > provided.len() { @@ -308,8 +309,8 @@ pub fn ordering_satisfy_requirement_concrete EquivalencePropertie /// Checks whether the given [`PhysicalSortRequirement`]s are equal or more /// specific than the provided [`PhysicalSortRequirement`]s. pub fn requirements_compatible EquivalenceProperties>( - provided: Option<&[PhysicalSortRequirement]>, - required: Option<&[PhysicalSortRequirement]>, + provided: Option<&OrderingRequirement>, + required: Option<&OrderingRequirement>, equal_properties: F, ) -> bool { match (provided, required) { @@ -324,8 +325,8 @@ pub fn requirements_compatible EquivalenceProperties>( /// Checks whether the given [`PhysicalSortRequirement`]s are equal or more /// specific than the provided [`PhysicalSortRequirement`]s. fn requirements_compatible_concrete EquivalenceProperties>( - provided: &[PhysicalSortRequirement], - required: &[PhysicalSortRequirement], + provided: &OrderingRequirement, + required: &OrderingRequirement, equal_properties: F, ) -> bool { if required.len() > provided.len() { @@ -393,9 +394,7 @@ pub fn map_columns_before_projection( /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr` /// for each entry in the input. If required ordering is None for an entry /// default ordering `ASC, NULLS LAST` if given. -pub fn make_sort_exprs_from_requirements( - required: &[PhysicalSortRequirement], -) -> Vec { +pub fn make_sort_exprs_from_requirements(required: &OrderingRequirement) -> ExprOrdering { required .iter() .map(|requirement| { @@ -766,7 +765,7 @@ mod tests { #[test] fn sort_expr_list_eq_strict_order_test() -> Result<()> { - let list1: Vec = vec![ + let list1: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -781,7 +780,7 @@ mod tests { }, ]; - let list2: Vec = vec![ + let list2: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("b", 1)), options: SortOptions::default(), @@ -805,7 +804,7 @@ mod tests { list1.as_slice() )); - let list3: Vec = vec![ + let list3: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), @@ -819,7 +818,7 @@ mod tests { options: SortOptions::default(), }, ]; - let list4: Vec = vec![ + let list4: ExprOrdering = vec![ PhysicalSortExpr { expr: Arc::new(Column::new("a", 0)), options: SortOptions::default(), diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 95fd86148ac2d..06260fda47f96 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -33,7 +33,7 @@ use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr}; use crate::window::{ PartitionBatches, PartitionWindowAggStates, SlidingAggregateWindowExpr, WindowExpr, }; -use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr}; +use crate::{AggregateExpr, ExprOrdering, ExprOrderingRef, PhysicalExpr}; /// A window expr that takes the form of an aggregate function /// Aggregate Window Expressions that have the form @@ -45,7 +45,7 @@ use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr}; pub struct PlainAggregateWindowExpr { aggregate: Arc, partition_by: Vec>, - order_by: Vec, + order_by: ExprOrdering, window_frame: Arc, } @@ -54,7 +54,7 @@ impl PlainAggregateWindowExpr { pub fn new( aggregate: Arc, partition_by: &[Arc], - order_by: &[PhysicalSortExpr], + order_by: ExprOrderingRef, window_frame: Arc, ) -> Self { Self { @@ -126,7 +126,7 @@ impl WindowExpr for PlainAggregateWindowExpr { &self.partition_by } - fn order_by(&self) -> &[PhysicalSortExpr] { + fn order_by(&self) -> ExprOrderingRef { &self.order_by } diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 329eac3334601..2dbe12330277b 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -30,7 +30,7 @@ use crate::window::window_expr::{ use crate::window::{ PartitionBatches, PartitionWindowAggStates, WindowAggState, WindowState, }; -use crate::{expressions::PhysicalSortExpr, PhysicalExpr}; +use crate::{ExprOrdering, ExprOrderingRef, PhysicalExpr}; use arrow::array::{new_empty_array, Array, ArrayRef}; use arrow::compute::SortOptions; use arrow::datatypes::Field; @@ -43,7 +43,7 @@ use datafusion_expr::WindowFrame; pub struct BuiltInWindowExpr { expr: Arc, partition_by: Vec>, - order_by: Vec, + order_by: ExprOrdering, window_frame: Arc, } @@ -52,7 +52,7 @@ impl BuiltInWindowExpr { pub fn new( expr: Arc, partition_by: &[Arc], - order_by: &[PhysicalSortExpr], + order_by: ExprOrderingRef, window_frame: Arc, ) -> Self { Self { @@ -91,7 +91,7 @@ impl WindowExpr for BuiltInWindowExpr { &self.partition_by } - fn order_by(&self) -> &[PhysicalSortExpr] { + fn order_by(&self) -> ExprOrderingRef { &self.order_by } diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 7fa33d71ca44c..481ad1213cb8e 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -32,7 +32,7 @@ use crate::window::window_expr::{reverse_order_bys, AggregateWindowExpr}; use crate::window::{ PartitionBatches, PartitionWindowAggStates, PlainAggregateWindowExpr, WindowExpr, }; -use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr}; +use crate::{AggregateExpr, ExprOrdering, ExprOrderingRef, PhysicalExpr}; /// A window expr that takes the form of an aggregate function /// Aggregate Window Expressions that have the form @@ -44,7 +44,7 @@ use crate::{expressions::PhysicalSortExpr, AggregateExpr, PhysicalExpr}; pub struct SlidingAggregateWindowExpr { aggregate: Arc, partition_by: Vec>, - order_by: Vec, + order_by: ExprOrdering, window_frame: Arc, } @@ -53,7 +53,7 @@ impl SlidingAggregateWindowExpr { pub fn new( aggregate: Arc, partition_by: &[Arc], - order_by: &[PhysicalSortExpr], + order_by: ExprOrderingRef, window_frame: Arc, ) -> Self { Self { @@ -108,7 +108,7 @@ impl WindowExpr for SlidingAggregateWindowExpr { &self.partition_by } - fn order_by(&self) -> &[PhysicalSortExpr] { + fn order_by(&self) -> ExprOrderingRef { &self.order_by } diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 7568fa3b2b58c..9885852413890 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -17,7 +17,7 @@ use crate::window::partition_evaluator::PartitionEvaluator; use crate::window::window_frame_state::WindowFrameContext; -use crate::{PhysicalExpr, PhysicalSortExpr}; +use crate::{ExprOrdering, ExprOrderingRef, PhysicalExpr, PhysicalSortExpr}; use arrow::array::{new_empty_array, Array, ArrayRef}; use arrow::compute::kernels::partition::lexicographical_partition_ranges; use arrow::compute::kernels::sort::SortColumn; @@ -103,7 +103,7 @@ pub trait WindowExpr: Send + Sync + Debug { fn partition_by(&self) -> &[Arc]; /// Expressions that's from the window function's order by clause, empty if absent - fn order_by(&self) -> &[PhysicalSortExpr]; + fn order_by(&self) -> ExprOrderingRef; /// Get order by columns, empty if absent fn order_by_columns(&self, batch: &RecordBatch) -> Result> { @@ -276,7 +276,7 @@ pub trait AggregateWindowExpr: WindowExpr { /// Reverses the ORDER BY expression, which is useful during equivalent window /// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns into /// 'ORDER BY a DESC, NULLS FIRST'. -pub fn reverse_order_bys(order_bys: &[PhysicalSortExpr]) -> Vec { +pub fn reverse_order_bys(order_bys: ExprOrderingRef) -> ExprOrdering { order_bys .iter() .map(|e| PhysicalSortExpr { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index b38ea2b35e37b..60a61ce1a4ccd 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -28,7 +28,7 @@ use datafusion::execution::context::ExecutionProps; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::window_function::WindowFunction; use datafusion::physical_expr::expressions::DateTimeIntervalExpr; -use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; +use datafusion::physical_expr::{ExprOrdering, PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::GetIndexedFieldExpr; use datafusion::physical_plan::expressions::LikeExpr; use datafusion::physical_plan::file_format::FileScanConfig; @@ -421,7 +421,7 @@ pub fn parse_protobuf_file_scan_config( }, }) }) - .collect::>>()?; + .collect::>()?; let output_ordering = if output_ordering.is_empty() { None } else {