diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index 8557769c3171e..71274d177d5f1 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -321,6 +321,9 @@ fn init() {
mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+ use datafusion_physical_expr::{
+ make_sort_requirements_from_exprs, PhysicalSortRequirement,
+ };
use super::*;
use crate::datasource::listing::PartitionedFile;
@@ -1131,8 +1134,10 @@ mod tests {
}
// model that it requires the output ordering of its input
- fn required_input_ordering(&self) -> Vec> {
- vec![self.input.output_ordering()]
+ fn required_input_ordering(&self) -> Vec >> {
+ vec![self
+ .output_ordering()
+ .map(make_sort_requirements_from_exprs)]
}
fn with_new_children(
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 9a87796bc180c..265c86cdf1e44 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -47,7 +47,10 @@ use crate::physical_plan::{with_new_children_if_necessary, Distribution, Executi
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{reverse_sort_options, DataFusionError};
-use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
+use datafusion_physical_expr::utils::{
+ make_sort_exprs_from_requirements, ordering_satisfy,
+ ordering_satisfy_requirement_concrete,
+};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::{concat, izip};
use std::iter::zip;
@@ -471,17 +474,20 @@ fn ensure_sorting(
let physical_ordering = child.output_ordering();
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(physical_ordering)) => {
- let is_ordering_satisfied = ordering_satisfy_concrete(
+ if !ordering_satisfy_requirement_concrete(
physical_ordering,
- required_ordering,
+ &required_ordering,
|| child.equivalence_properties(),
- );
- if !is_ordering_satisfied {
+ ) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
- let sort_expr = required_ordering.to_vec();
+ let sort_expr = make_sort_exprs_from_requirements(&required_ordering);
add_sort_above(child, sort_expr)?;
- *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
+ if is_sort(child) {
+ *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
+ } else {
+ *sort_onwards = None;
+ }
}
if let Some(tree) = sort_onwards {
// For window expressions, we can remove some sorts when we can
@@ -497,7 +503,8 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
- add_sort_above(child, required.to_vec())?;
+ let sort_expr = make_sort_exprs_from_requirements(&required);
+ add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
@@ -592,7 +599,6 @@ fn analyze_window_sort_removal(
};
let mut first_should_reverse = None;
- let mut physical_ordering_common = vec![];
for sort_any in sort_tree.get_leaves() {
let sort_output_ordering = sort_any.output_ordering();
// Variable `sort_any` will either be a `SortExec` or a
@@ -609,11 +615,6 @@ fn analyze_window_sort_removal(
DataFusionError::Plan("A SortExec should have output ordering".to_string())
})?;
if let Some(physical_ordering) = physical_ordering {
- if physical_ordering_common.is_empty()
- || physical_ordering.len() < physical_ordering_common.len()
- {
- physical_ordering_common = physical_ordering.to_vec();
- }
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
@@ -664,7 +665,6 @@ fn analyze_window_sort_removal(
new_child,
new_schema,
partition_keys.to_vec(),
- Some(physical_ordering_common),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
@@ -672,7 +672,6 @@ fn analyze_window_sort_removal(
new_child,
new_schema,
partition_keys.to_vec(),
- Some(physical_ordering_common),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
@@ -1889,7 +1888,6 @@ mod tests {
input.clone(),
input.schema(),
vec![],
- Some(sort_exprs),
)
.unwrap(),
)
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index de9199ea8297c..623bafe0937a7 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -55,6 +55,9 @@ use crate::physical_plan::{
};
use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_physical_expr::{
+ make_sort_requirements_from_exprs, PhysicalSortRequirement,
+};
/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
@@ -225,8 +228,11 @@ impl ExecutionPlan for SortMergeJoinExec {
]
}
- fn required_input_ordering(&self) -> Vec > {
- vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)]
+ fn required_input_ordering(&self) -> Vec >> {
+ vec![
+ Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)),
+ Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)),
+ ]
}
fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 3af983d8f06a2..5a249e433dd21 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -46,6 +46,9 @@ use hashbrown::{raw::RawTable, HashSet};
use datafusion_common::{utils::bisect, ScalarValue};
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
+use datafusion_physical_expr::{
+ make_sort_requirements_from_exprs, PhysicalSortRequirement,
+};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
@@ -399,11 +402,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.schema.clone()
}
- fn required_input_ordering(&self) -> Vec > {
- vec![
- Some(&self.left_required_sort_exprs),
- Some(&self.right_required_sort_exprs),
- ]
+ fn required_input_ordering(&self) -> Vec >> {
+ let left_required =
+ make_sort_requirements_from_exprs(&self.left_required_sort_exprs);
+ let right_required =
+ make_sort_requirements_from_exprs(&self.right_required_sort_exprs);
+ vec![Some(left_required), Some(right_required)]
}
fn unbounded_output(&self, children: &[bool]) -> Result {
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index c59dd0c62ee53..9815d9491e021 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -142,7 +142,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// NOTE that checking `!is_empty()` does **not** check for a
/// required input ordering. Instead, the correct check is that at
/// least one entry must be `Some`
- fn required_input_ordering(&self) -> Vec> {
+ fn required_input_ordering(&self) -> Vec >> {
vec![None; self.children().len()]
}
@@ -591,11 +591,11 @@ impl Distribution {
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
-use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
};
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
+use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 51653450a6996..4b21a9bd735e0 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -577,33 +577,6 @@ impl DefaultPhysicalPlanner {
let logical_input_schema = input.schema();
- let physical_sort_keys = if sort_keys.is_empty() {
- None
- } else {
- let physical_input_schema = input_exec.schema();
- let sort_keys = sort_keys
- .iter()
- .map(|(e, _)| match e {
- Expr::Sort(expr::Sort {
- expr,
- asc,
- nulls_first,
- }) => create_physical_sort_expr(
- expr,
- logical_input_schema,
- &physical_input_schema,
- SortOptions {
- descending: !*asc,
- nulls_first: *nulls_first,
- },
- session_state.execution_props(),
- ),
- _ => unreachable!(),
- })
- .collect::>>()?;
- Some(sort_keys)
- };
-
let physical_input_schema = input_exec.schema();
let window_expr = window_expr
.iter()
@@ -628,7 +601,6 @@ impl DefaultPhysicalPlanner {
input_exec,
physical_input_schema,
physical_partition_keys,
- physical_sort_keys,
)?)
} else {
Arc::new(WindowAggExec::try_new(
@@ -636,7 +608,6 @@ impl DefaultPhysicalPlanner {
input_exec,
physical_input_schema,
physical_partition_keys,
- physical_sort_keys,
)?)
})
}
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 7ef4d3bf8e868..edacae4052d83 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -46,7 +46,9 @@ use crate::physical_plan::{
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
-use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_expr::{
+ make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement,
+};
/// Sort preserving merge execution plan
///
@@ -125,12 +127,16 @@ impl ExecutionPlan for SortPreservingMergeExec {
vec![Distribution::UnspecifiedDistribution]
}
- fn required_input_ordering(&self) -> Vec> {
- vec![Some(&self.expr)]
+ fn required_input_ordering(&self) -> Vec >> {
+ vec![Some(make_sort_requirements_from_exprs(&self.expr))]
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- Some(&self.expr)
+ self.input.output_ordering()
+ }
+
+ fn maintains_input_order(&self) -> Vec {
+ vec![true]
}
fn equivalence_properties(&self) -> EquivalenceProperties {
diff --git a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index fa276c423879f..4b01d3b4ed386 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -50,11 +50,14 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
+use crate::physical_plan::windows::calc_requirements;
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates,
WindowAggState, WindowState,
};
-use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use datafusion_physical_expr::{
+ EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement,
+};
use indexmap::IndexMap;
use log::debug;
@@ -71,8 +74,6 @@ pub struct BoundedWindowAggExec {
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec>,
- /// Sort Keys
- pub sort_keys: Option>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -84,7 +85,6 @@ impl BoundedWindowAggExec {
input: Arc,
input_schema: SchemaRef,
partition_keys: Vec>,
- sort_keys: Option>,
) -> Result {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
@@ -94,7 +94,6 @@ impl BoundedWindowAggExec {
schema,
input_schema,
partition_keys,
- sort_keys,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -123,7 +122,7 @@ impl BoundedWindowAggExec {
let mut result = vec![];
// All window exprs have the same partition by, so we just use the first one:
let partition_by = self.window_expr()[0].partition_by();
- let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+ let sort_keys = self.input.output_ordering().unwrap_or(&[]);
for item in partition_by {
if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
result.push(a.clone());
@@ -167,9 +166,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
self.input().output_ordering()
}
- fn required_input_ordering(&self) -> Vec> {
- let sort_keys = self.sort_keys.as_deref();
- vec![sort_keys]
+ fn required_input_ordering(&self) -> Vec >> {
+ let partition_bys = self.window_expr()[0].partition_by();
+ let order_keys = self.window_expr()[0].order_by();
+ let requirements = calc_requirements(partition_bys, order_keys);
+ vec![requirements]
}
fn required_input_distribution(&self) -> Vec {
@@ -177,7 +178,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
debug!("No partition defined for BoundedWindowAggExec!!!");
vec![Distribution::SinglePartition]
} else {
- //TODO support PartitionCollections if there is no common partition columns in the window_expr
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
}
}
@@ -199,7 +199,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
- self.sort_keys.clone(),
)?))
}
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs b/datafusion/core/src/physical_plan/windows/mod.rs
index bdb9aa32645f6..f7f9bb76b3f44 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -46,6 +46,7 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
+use datafusion_physical_expr::PhysicalSortRequirement;
pub use window_agg_exec::WindowAggExec;
/// Create a physical expression for window function
@@ -187,6 +188,30 @@ fn create_built_in_window_expr(
})
}
+pub(crate) fn calc_requirements(
+ partition_by_exprs: &[Arc],
+ orderby_sort_exprs: &[PhysicalSortExpr],
+) -> Option> {
+ let mut sort_reqs = vec![];
+ for partition_by in partition_by_exprs {
+ sort_reqs.push(PhysicalSortRequirement {
+ expr: partition_by.clone(),
+ options: None,
+ });
+ }
+ for PhysicalSortExpr { expr, options } in orderby_sort_exprs {
+ let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr));
+ if !contains {
+ sort_reqs.push(PhysicalSortRequirement {
+ expr: expr.clone(),
+ options: Some(*options),
+ });
+ }
+ }
+ // Convert empty result to None. Otherwise wrap result inside Some()
+ (!sort_reqs.is_empty()).then_some(sort_reqs)
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -198,6 +223,7 @@ mod tests {
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{self, assert_is_pending};
use arrow::array::*;
+ use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::cast::as_primitive_array;
@@ -210,6 +236,79 @@ mod tests {
Ok((csv, schema))
}
+ fn create_test_schema2() -> Result {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, true);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d]));
+ Ok(schema)
+ }
+
+ #[tokio::test]
+ async fn test_calc_requirements() -> Result<()> {
+ let schema = create_test_schema2()?;
+ let test_data = vec![
+ // PARTITION BY a, ORDER BY b ASC NULLS FIRST
+ (
+ vec!["a"],
+ vec![("b", true, true)],
+ vec![("a", None), ("b", Some((true, true)))],
+ ),
+ // PARTITION BY a, ORDER BY a ASC NULLS FIRST
+ (vec!["a"], vec![("a", true, true)], vec![("a", None)]),
+ // PARTITION BY a, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
+ (
+ vec!["a"],
+ vec![("b", true, true), ("c", false, false)],
+ vec![
+ ("a", None),
+ ("b", Some((true, true))),
+ ("c", Some((false, false))),
+ ],
+ ),
+ // PARTITION BY a, c, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
+ (
+ vec!["a", "c"],
+ vec![("b", true, true), ("c", false, false)],
+ vec![("a", None), ("c", None), ("b", Some((true, true)))],
+ ),
+ ];
+ for (pb_params, ob_params, expected_params) in test_data {
+ let mut partitionbys = vec![];
+ for col_name in pb_params {
+ partitionbys.push(col(col_name, &schema)?);
+ }
+
+ let mut orderbys = vec![];
+ for (col_name, descending, nulls_first) in ob_params {
+ let expr = col(col_name, &schema)?;
+ let options = SortOptions {
+ descending,
+ nulls_first,
+ };
+ orderbys.push(PhysicalSortExpr { expr, options });
+ }
+
+ let mut expected: Option> = None;
+ for (col_name, reqs) in expected_params {
+ let options = reqs.map(|(descending, nulls_first)| SortOptions {
+ descending,
+ nulls_first,
+ });
+ let expr = col(col_name, &schema)?;
+ let res = PhysicalSortRequirement { expr, options };
+ if let Some(expected) = &mut expected {
+ expected.push(res);
+ } else {
+ expected = Some(vec![res]);
+ }
+ }
+ assert_eq!(calc_requirements(&partitionbys, &orderbys), expected);
+ }
+ Ok(())
+ }
+
#[tokio::test]
async fn window_function_with_udaf() -> Result<()> {
#[derive(Debug)]
@@ -269,7 +368,6 @@ mod tests {
input,
schema.clone(),
vec![],
- None,
)?);
let result: Vec = collect(window_exec, task_ctx).await?;
@@ -323,7 +421,6 @@ mod tests {
input,
schema.clone(),
vec![],
- None,
)?);
let result: Vec = collect(window_exec, task_ctx).await?;
@@ -371,7 +468,6 @@ mod tests {
blocking_exec,
schema,
vec![],
- None,
)?);
let fut = collect(window_agg_exec, task_ctx);
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index a667f0a3c2168..29d5c4dc8f3db 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,6 +24,7 @@ use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
+use crate::physical_plan::windows::calc_requirements;
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
@@ -39,6 +40,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::DataFusionError;
+use datafusion_physical_expr::PhysicalSortRequirement;
use futures::stream::Stream;
use futures::{ready, StreamExt};
use log::debug;
@@ -61,8 +63,6 @@ pub struct WindowAggExec {
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec>,
- /// Sort Keys
- pub sort_keys: Option>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
@@ -74,7 +74,6 @@ impl WindowAggExec {
input: Arc,
input_schema: SchemaRef,
partition_keys: Vec>,
- sort_keys: Option>,
) -> Result {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
@@ -85,7 +84,6 @@ impl WindowAggExec {
schema,
input_schema,
partition_keys,
- sort_keys,
metrics: ExecutionPlanMetricsSet::new(),
})
}
@@ -114,7 +112,7 @@ impl WindowAggExec {
let mut result = vec![];
// All window exprs have the same partition by, so we just use the first one:
let partition_by = self.window_expr()[0].partition_by();
- let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+ let sort_keys = self.input.output_ordering().unwrap_or(&[]);
for item in partition_by {
if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
result.push(a.clone());
@@ -172,9 +170,11 @@ impl ExecutionPlan for WindowAggExec {
vec![true]
}
- fn required_input_ordering(&self) -> Vec> {
- let sort_keys = self.sort_keys.as_deref();
- vec![sort_keys]
+ fn required_input_ordering(&self) -> Vec >> {
+ let partition_bys = self.window_expr()[0].partition_by();
+ let order_keys = self.window_expr()[0].order_by();
+ let requirements = calc_requirements(partition_bys, order_keys);
+ vec![requirements]
}
fn required_input_distribution(&self) -> Vec {
@@ -200,7 +200,6 @@ impl ExecutionPlan for WindowAggExec {
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
- self.sort_keys.clone(),
)?))
}
diff --git a/datafusion/core/tests/sql/select.rs b/datafusion/core/tests/sql/select.rs
index 28924768489d5..b6017f826c769 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -670,7 +670,7 @@ async fn sort_on_window_null_string() -> Result<()> {
])
.unwrap();
- let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
+ let ctx = SessionContext::with_config(SessionConfig::new().with_target_partitions(1));
ctx.register_batch("test", batch)?;
let sql =
@@ -689,7 +689,8 @@ async fn sort_on_window_null_string() -> Result<()> {
];
assert_batches_eq!(expected, &actual);
- let sql = "SELECT d2, row_number() OVER (partition by d2) as rn1 FROM test";
+ let sql =
+ "SELECT d2, row_number() OVER (partition by d2) as rn1 FROM test ORDER BY d2 asc";
let actual = execute_to_batches(&ctx, sql).await;
// NULLS LAST
let expected = vec![
@@ -704,7 +705,7 @@ async fn sort_on_window_null_string() -> Result<()> {
assert_batches_eq!(expected, &actual);
let sql =
- "SELECT d2, row_number() OVER (partition by d2 order by d2 desc) as rn1 FROM test";
+ "SELECT d2, row_number() OVER (partition by d2 order by d2 desc) as rn1 FROM test ORDER BY d2 desc";
let actual = execute_to_batches(&ctx, sql).await;
// NULLS FIRST
diff --git a/datafusion/core/tests/window_fuzz.rs b/datafusion/core/tests/window_fuzz.rs
index a71aab280ea19..179bada53cc61 100644
--- a/datafusion/core/tests/window_fuzz.rs
+++ b/datafusion/core/tests/window_fuzz.rs
@@ -330,7 +330,9 @@ async fn run_window_test(
let concat_input_record = concat_batches(&schema, &input1).unwrap();
let exec1 = Arc::new(
- MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None).unwrap(),
+ MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)
+ .unwrap()
+ .with_sort_information(sort_keys.clone()),
);
let usual_window_exec = Arc::new(
WindowAggExec::try_new(
@@ -347,12 +349,14 @@ async fn run_window_test(
exec1,
schema.clone(),
vec![],
- Some(sort_keys.clone()),
)
.unwrap(),
);
- let exec2 =
- Arc::new(MemoryExec::try_new(&[input1.clone()], schema.clone(), None).unwrap());
+ let exec2 = Arc::new(
+ MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
+ .unwrap()
+ .with_sort_information(sort_keys),
+ );
let running_window_exec = Arc::new(
BoundedWindowAggExec::try_new(
vec![create_window_expr(
@@ -368,7 +372,6 @@ async fn run_window_test(
exec2,
schema.clone(),
vec![],
- Some(sort_keys),
)
.unwrap(),
);
diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs
index 56ca0824f46b3..a1698fe072b40 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -53,7 +53,9 @@ pub use equivalence::EquivalentClass;
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
pub use planner::create_physical_expr;
pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::PhysicalSortExpr;
+pub use sort_expr::{
+ make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement,
+};
pub use utils::{
expr_list_eq_any_order, expr_list_eq_strict_order,
normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema,
diff --git a/datafusion/physical-expr/src/sort_expr.rs b/datafusion/physical-expr/src/sort_expr.rs
index f8172dabf65aa..08bd394e6d117 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -41,14 +41,7 @@ impl PartialEq for PhysicalSortExpr {
impl std::fmt::Display for PhysicalSortExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- let opts_string = match (self.options.descending, self.options.nulls_first) {
- (true, true) => "DESC",
- (true, false) => "DESC NULLS LAST",
- (false, true) => "ASC",
- (false, false) => "ASC NULLS LAST",
- };
-
- write!(f, "{} {}", self.expr, opts_string)
+ write!(f, "{} {}", self.expr, to_str(&self.options))
}
}
@@ -69,4 +62,73 @@ impl PhysicalSortExpr {
options: Some(self.options),
})
}
+
+ /// Check whether sort expression satisfies `PhysicalSortRequirement`.
+ // If sort options is Some in `PhysicalSortRequirement`, `expr` and `options` field are compared for equality.
+ // If sort options is None in `PhysicalSortRequirement`, only `expr` is compared for equality.
+ pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+ self.expr.eq(&requirement.expr)
+ && requirement
+ .options
+ .map_or(true, |opts| self.options == opts)
+ }
+}
+
+/// Represents sort requirement associated with a plan
+#[derive(Clone, Debug)]
+pub struct PhysicalSortRequirement {
+ /// Physical expression representing the column to sort
+ pub expr: Arc,
+ /// Option to specify how the given column should be sorted.
+ /// If unspecified, there is no constraint on sort options.
+ pub options: Option,
+}
+
+impl From for PhysicalSortRequirement {
+ fn from(value: PhysicalSortExpr) -> Self {
+ Self {
+ expr: value.expr,
+ options: Some(value.options),
+ }
+ }
+}
+
+impl PartialEq for PhysicalSortRequirement {
+ fn eq(&self, other: &PhysicalSortRequirement) -> bool {
+ self.options == other.options && self.expr.eq(&other.expr)
+ }
+}
+
+impl std::fmt::Display for PhysicalSortRequirement {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ let opts_string = self.options.as_ref().map_or("NA", to_str);
+ write!(f, "{} {}", self.expr, opts_string)
+ }
+}
+
+impl PhysicalSortRequirement {
+ /// Returns whether this requirement is equal or more specific than `other`.
+ pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
+ self.expr.eq(&other.expr)
+ && other.options.map_or(true, |other_opts| {
+ self.options.map_or(false, |opts| opts == other_opts)
+ })
+ }
+}
+
+pub fn make_sort_requirements_from_exprs(
+ ordering: &[PhysicalSortExpr],
+) -> Vec {
+ ordering.iter().map(|e| e.clone().into()).collect()
+}
+
+/// Returns the SQL string representation of the given [SortOptions] object.
+#[inline]
+fn to_str(options: &SortOptions) -> &str {
+ match (options.descending, options.nulls_first) {
+ (true, true) => "DESC",
+ (true, false) => "DESC NULLS LAST",
+ (false, true) => "ASC",
+ (false, false) => "ASC NULLS LAST",
+ }
}
diff --git a/datafusion/physical-expr/src/utils.rs b/datafusion/physical-expr/src/utils.rs
index 7c8c94c7d454c..cd4ac6ff3d02c 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -17,11 +17,14 @@
use crate::equivalence::EquivalentClass;
use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr};
+use crate::{
+ EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
+};
use arrow::datatypes::SchemaRef;
use datafusion_common::Result;
use datafusion_expr::Operator;
+use arrow_schema::SortOptions;
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
};
@@ -199,6 +202,24 @@ pub fn normalize_sort_expr_with_equivalence_properties(
}
}
+pub fn normalize_sort_requirement_with_equivalence_properties(
+ sort_requirement: PhysicalSortRequirement,
+ eq_properties: &[EquivalentClass],
+) -> PhysicalSortRequirement {
+ let normalized_expr = normalize_expr_with_equivalence_properties(
+ sort_requirement.expr.clone(),
+ eq_properties,
+ );
+ if sort_requirement.expr.ne(&normalized_expr) {
+ PhysicalSortRequirement {
+ expr: normalized_expr,
+ options: sort_requirement.options,
+ }
+ } else {
+ sort_requirement
+ }
+}
+
/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s.
pub fn ordering_satisfy EquivalenceProperties>(
provided: Option<&[PhysicalSortExpr]>,
@@ -224,31 +245,102 @@ pub fn ordering_satisfy_concrete EquivalenceProperties>(
} else if required
.iter()
.zip(provided.iter())
- .all(|(order1, order2)| order1.eq(order2))
+ .all(|(req, given)| req.eq(given))
{
true
} else if let eq_classes @ [_, ..] = equal_properties().classes() {
- let normalized_required_exprs = required
+ required
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
})
- .collect::>();
- let normalized_provided_exprs = provided
+ .zip(provided.iter().map(|e| {
+ normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+ }))
+ .all(|(req, given)| req.eq(&given))
+ } else {
+ false
+ }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+pub fn ordering_satisfy_requirement EquivalenceProperties>(
+ provided: Option<&[PhysicalSortExpr]>,
+ required: Option<&[PhysicalSortRequirement]>,
+ equal_properties: F,
+) -> bool {
+ match (provided, required) {
+ (_, None) => true,
+ (None, Some(_)) => false,
+ (Some(provided), Some(required)) => {
+ ordering_satisfy_requirement_concrete(provided, required, equal_properties)
+ }
+ }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+pub fn ordering_satisfy_requirement_concrete EquivalenceProperties>(
+ provided: &[PhysicalSortExpr],
+ required: &[PhysicalSortRequirement],
+ equal_properties: F,
+) -> bool {
+ if required.len() > provided.len() {
+ false
+ } else if required
+ .iter()
+ .zip(provided.iter())
+ .all(|(req, given)| given.satisfy(req))
+ {
+ true
+ } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+ required
.iter()
.map(|e| {
- normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+ normalize_sort_requirement_with_equivalence_properties(
+ e.clone(),
+ eq_classes,
+ )
})
- .collect::>();
- normalized_required_exprs
- .iter()
- .zip(normalized_provided_exprs.iter())
- .all(|(order1, order2)| order1.eq(order2))
+ .zip(provided.iter().map(|e| {
+ normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
+ }))
+ .all(|(req, given)| given.satisfy(&req))
} else {
false
}
}
+/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
+/// for each entry in the input. If required ordering is None for an entry
+/// default ordering `ASC, NULLS LAST` if given.
+pub fn make_sort_exprs_from_requirements(
+ required: &[PhysicalSortRequirement],
+) -> Vec {
+ required
+ .iter()
+ .map(|requirement| {
+ if let Some(options) = requirement.options {
+ PhysicalSortExpr {
+ expr: requirement.expr.clone(),
+ options,
+ }
+ } else {
+ PhysicalSortExpr {
+ expr: requirement.expr.clone(),
+ options: SortOptions {
+ // By default, create sort key with ASC is true and NULLS LAST to be consistent with
+ // PostgreSQL's rule: https://www.postgresql.org/docs/current/queries-order.html
+ descending: false,
+ nulls_first: false,
+ },
+ }
+ }
+ })
+ .collect()
+}
+
#[derive(Clone, Debug)]
pub struct ExprTreeNode {
expr: Arc,
diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs
index 92986b0b39b26..7711b3c9617ae 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -334,7 +334,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
input,
Arc::new((&input_schema).try_into()?),
vec![],
- None,
)?))
}
PhysicalPlanType::Aggregate(hash_agg) => {