From c2e652e48c82aedb20c289cbf5e7a7e279aa436e Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 17:39:17 -0700 Subject: [PATCH 01/10] test: demonstrate bug where the SanityCheckPlan does not know to ignore constant columns in sort --- datafusion/sqllogictest/test_files/order.slt | 33 ++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 7bb872e5a48f..3ee19abfbb9e 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1196,3 +1196,36 @@ physical_plan 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + + +# Test: inputs into union with different orderings +query TT +explain select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 +union all +select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 +order by d, c, a, a0, b +limit 2; +---- +logical_plan +01)Projection: t1.b, t1.c, t1.a, t1.a0 +02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2 +03)----Union +04)------SubqueryAlias: t1 +05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d +06)----------TableScan: ordered_table projection=[a, b, c, d] +07)------SubqueryAlias: t2 +08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d +09)----------TableScan: ordered_table projection=[a0, b, c, d] + +# Test: run the query from above +# TODO: query fails since the constant columns t1.a0 and t2.a are not in the ORDER BY subquery, +# and SanityCheckPlan does not allow this. +statement error DataFusion error: SanityCheckPlan +select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 +union all +select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 +order by d, c, a, a0, b +limit 2; + +statement ok +drop table ordered_table; From a50edc35f0905ddce5a60951964db3bbae94a6a7 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 20:14:00 -0700 Subject: [PATCH 02/10] fix: for the UnionExec, the sanity check should enforce restrictions based upon the Union's parent vs the Union's input --- .../src/physical_optimizer/sanity_checker.rs | 34 +++++++++++++------ 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index e392105fbcb7..8a0d736950e1 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -31,6 +31,7 @@ use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -121,21 +122,34 @@ pub fn check_plan_sanity( check_finiteness_requirements(plan.clone(), optimizer_options)?; for ((idx, child), sort_req, dist_req) in izip!( - plan.children().iter().enumerate(), + plan.children().into_iter().enumerate(), plan.required_input_ordering().iter(), plan.required_input_distribution().iter() ) { let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { - if !child_eq_props.ordering_satisfy_requirement(sort_req) { - let plan_str = get_plan_string(&plan); - return plan_err!( - "Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}", - plan_str, - sort_req, - idx, - child_eq_props.oeq_class - ); + // The `EquivalenceProperties::ordering_satisfy_requirement` compares the oeq_class + // orderings, minus their constants, to the requirement. + // + // For the UnionExec, it has the oeq_class orderings from it's children but does not + // have the same constants. As such, the sort requirements cannot be fulfilled + // without examination of the union's children with both the orderings & constants. + let children = match child.as_any().downcast_ref::() { + Some(union) => union.children(), + _ => vec![child], + }; + for child in children { + let child_eq_props = child.equivalence_properties(); + if !child_eq_props.ordering_satisfy_requirement(sort_req) { + let plan_str = get_plan_string(&plan); + return plan_err!( + "Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}", + plan_str, + sort_req, + idx, + child_eq_props.oeq_class + ); + } } } From 782e18daedaf65cd21f56b6a420c1a69b439accd Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 9 Sep 2024 20:14:33 -0700 Subject: [PATCH 03/10] test: update test once bug fix is complete --- datafusion/sqllogictest/test_files/order.slt | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 3ee19abfbb9e..671c18674acc 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1216,16 +1216,27 @@ logical_plan 07)------SubqueryAlias: t2 08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d 09)----------TableScan: ordered_table projection=[a0, b, c, d] +physical_plan +01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0] +02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2 +03)----UnionExec +04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true +07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] +09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true # Test: run the query from above -# TODO: query fails since the constant columns t1.a0 and t2.a are not in the ORDER BY subquery, -# and SanityCheckPlan does not allow this. -statement error DataFusion error: SanityCheckPlan +query IIII select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 union all select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 order by d, c, a, a0, b limit 2; +---- +0 0 0 NULL +0 0 NULL 1 statement ok drop table ordered_table; From 6e7356b1b94a109f5823cf2e3962d805f820f019 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 10 Sep 2024 12:38:33 -0700 Subject: [PATCH 04/10] Revert "fix: for the UnionExec, the sanity check should enforce restrictions based upon the Union's parent vs the Union's input" This reverts commit a50edc35f0905ddce5a60951964db3bbae94a6a7. --- .../src/physical_optimizer/sanity_checker.rs | 34 ++++++------------- 1 file changed, 10 insertions(+), 24 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sanity_checker.rs b/datafusion/core/src/physical_optimizer/sanity_checker.rs index 8a0d736950e1..e392105fbcb7 100644 --- a/datafusion/core/src/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/src/physical_optimizer/sanity_checker.rs @@ -31,7 +31,6 @@ use datafusion_common::plan_err; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; -use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use datafusion_physical_optimizer::PhysicalOptimizerRule; @@ -122,34 +121,21 @@ pub fn check_plan_sanity( check_finiteness_requirements(plan.clone(), optimizer_options)?; for ((idx, child), sort_req, dist_req) in izip!( - plan.children().into_iter().enumerate(), + plan.children().iter().enumerate(), plan.required_input_ordering().iter(), plan.required_input_distribution().iter() ) { let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { - // The `EquivalenceProperties::ordering_satisfy_requirement` compares the oeq_class - // orderings, minus their constants, to the requirement. - // - // For the UnionExec, it has the oeq_class orderings from it's children but does not - // have the same constants. As such, the sort requirements cannot be fulfilled - // without examination of the union's children with both the orderings & constants. - let children = match child.as_any().downcast_ref::() { - Some(union) => union.children(), - _ => vec![child], - }; - for child in children { - let child_eq_props = child.equivalence_properties(); - if !child_eq_props.ordering_satisfy_requirement(sort_req) { - let plan_str = get_plan_string(&plan); - return plan_err!( - "Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}", - plan_str, - sort_req, - idx, - child_eq_props.oeq_class - ); - } + if !child_eq_props.ordering_satisfy_requirement(sort_req) { + let plan_str = get_plan_string(&plan); + return plan_err!( + "Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}", + plan_str, + sort_req, + idx, + child_eq_props.oeq_class + ); } } From 0f317a64a748c5f08cb2041a73031c54720f3dae Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 10 Sep 2024 12:52:56 -0700 Subject: [PATCH 05/10] chore: update across_partitions docs to match actual usage --- datafusion/physical-expr/src/equivalence/class.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 0296b7a247d6..3fc7b0feb8a2 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -41,7 +41,8 @@ use datafusion_common::JoinType; /// /// - `across_partitions`: A boolean flag indicating whether the constant expression is /// valid across partitions. If set to `true`, the constant expression has same value for all partitions. -/// If set to `false`, the constant expression may have different values for different partitions. +/// If set to `false`, the constant expression may have different constant values for different partitions +/// or only be constant within one of the partitions. /// /// # Example /// From 4bd4db0ee39824427d0366a978e7888299bd0453 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 10 Sep 2024 14:43:54 -0700 Subject: [PATCH 06/10] fix: add constants from either side to the UnionExec constants caveat: this has an unintended side effect, as the EnforceSorting removes the sort_expr from one input/side of the UnionExec (where it's not constant) --- .../physical-expr/src/equivalence/class.rs | 32 +++++++++++++++++++ .../src/equivalence/properties.rs | 16 ++-------- datafusion/sqllogictest/test_files/order.slt | 4 +-- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 3fc7b0feb8a2..066e9077361f 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +use std::hash::{Hash, Hasher}; +use std::ops::RangeFull; use std::sync::Arc; use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; @@ -27,6 +29,7 @@ use crate::{ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; +use indexmap::IndexSet; #[derive(Debug, Clone)] /// A structure representing a expression known to be constant in a physical execution plan. @@ -124,6 +127,35 @@ pub fn const_exprs_contains( .any(|const_expr| const_expr.expr.eq(expr)) } +impl Eq for ConstExpr {} + +impl PartialEq for ConstExpr { + fn eq(&self, other: &Self) -> bool { + self.expr.eq(other.expr()) + } +} + +impl Hash for ConstExpr { + fn hash(&self, state: &mut H) { + self.expr.hash(state); + } +} + +/// Concats two slices of `const_exprs, removing duplicates and +/// maintaining the order. +/// +/// Equality based upon the expression. `across_partitions` will +/// always be false as we do not validate the same constant value +/// on both sides. +pub fn concat_const_exprs(lhs: &[ConstExpr], rhs: &[ConstExpr]) -> Vec { + IndexSet::<&ConstExpr>::from_iter(lhs.iter().chain(rhs.iter())) + .drain(RangeFull) + .map(|constant_expr| { + ConstExpr::new(Arc::clone(&constant_expr.expr)).with_across_partitions(false) + }) + .collect() +} + /// An `EquivalenceClass` is a set of [`Arc`]s that are known /// to have the same value for all tuples in a relation. These are generated by /// equality predicates (e.g. `a = b`), typically equi-join conditions and diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a5d54ee56cff..d53342bcf40a 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -18,6 +18,7 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; +use super::class::concat_const_exprs; use super::ordering::collapse_lex_ordering; use crate::equivalence::class::const_exprs_contains; use crate::equivalence::{ @@ -1539,19 +1540,8 @@ fn calculate_union_binary( } // First, calculate valid constants for the union. A quantity is constant - // after the union if it is constant in both sides. - let constants = lhs - .constants() - .iter() - .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) - .map(|const_expr| { - // TODO: When both sides' constants are valid across partitions, - // the union's constant should also be valid if values are - // the same. However, we do not have the capability to - // check this yet. - ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) - }) - .collect(); + // after the union if it is constant on one of the sides. + let constants = concat_const_exprs(lhs.constants(), rhs.constants()); // Next, calculate valid orderings for the union by searching for prefixes // in both sides. diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 671c18674acc..42b55427fe26 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1220,10 +1220,10 @@ physical_plan 01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0] 02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2 03)----UnionExec -04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] 05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] 06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true -07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] 08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] 09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true From 6dafd1a9f3d444f3bd72887de24bf9b78f94c256 Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 14 Sep 2024 11:03:53 -0700 Subject: [PATCH 07/10] Revert "chore: update across_partitions docs to match actual usage" This reverts commit 0f317a64a748c5f08cb2041a73031c54720f3dae. --- datafusion/physical-expr/src/equivalence/class.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 066e9077361f..c0a80b28a374 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -44,8 +44,7 @@ use indexmap::IndexSet; /// /// - `across_partitions`: A boolean flag indicating whether the constant expression is /// valid across partitions. If set to `true`, the constant expression has same value for all partitions. -/// If set to `false`, the constant expression may have different constant values for different partitions -/// or only be constant within one of the partitions. +/// If set to `false`, the constant expression may have different values for different partitions. /// /// # Example /// From 08ac804689d3bf1297d8cc52935f9e15d9e52b27 Mon Sep 17 00:00:00 2001 From: wiedld Date: Sat, 14 Sep 2024 11:04:07 -0700 Subject: [PATCH 08/10] Revert "fix: add constants from either side to the UnionExec constants" This reverts commit 4bd4db0ee39824427d0366a978e7888299bd0453. --- .../physical-expr/src/equivalence/class.rs | 32 ------------------- .../src/equivalence/properties.rs | 16 ++++++++-- datafusion/sqllogictest/test_files/order.slt | 4 +-- 3 files changed, 15 insertions(+), 37 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index c0a80b28a374..0296b7a247d6 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::hash::{Hash, Hasher}; -use std::ops::RangeFull; use std::sync::Arc; use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; @@ -29,7 +27,6 @@ use crate::{ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; -use indexmap::IndexSet; #[derive(Debug, Clone)] /// A structure representing a expression known to be constant in a physical execution plan. @@ -126,35 +123,6 @@ pub fn const_exprs_contains( .any(|const_expr| const_expr.expr.eq(expr)) } -impl Eq for ConstExpr {} - -impl PartialEq for ConstExpr { - fn eq(&self, other: &Self) -> bool { - self.expr.eq(other.expr()) - } -} - -impl Hash for ConstExpr { - fn hash(&self, state: &mut H) { - self.expr.hash(state); - } -} - -/// Concats two slices of `const_exprs, removing duplicates and -/// maintaining the order. -/// -/// Equality based upon the expression. `across_partitions` will -/// always be false as we do not validate the same constant value -/// on both sides. -pub fn concat_const_exprs(lhs: &[ConstExpr], rhs: &[ConstExpr]) -> Vec { - IndexSet::<&ConstExpr>::from_iter(lhs.iter().chain(rhs.iter())) - .drain(RangeFull) - .map(|constant_expr| { - ConstExpr::new(Arc::clone(&constant_expr.expr)).with_across_partitions(false) - }) - .collect() -} - /// An `EquivalenceClass` is a set of [`Arc`]s that are known /// to have the same value for all tuples in a relation. These are generated by /// equality predicates (e.g. `a = b`), typically equi-join conditions and diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index d53342bcf40a..a5d54ee56cff 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -18,7 +18,6 @@ use std::hash::{Hash, Hasher}; use std::sync::Arc; -use super::class::concat_const_exprs; use super::ordering::collapse_lex_ordering; use crate::equivalence::class::const_exprs_contains; use crate::equivalence::{ @@ -1540,8 +1539,19 @@ fn calculate_union_binary( } // First, calculate valid constants for the union. A quantity is constant - // after the union if it is constant on one of the sides. - let constants = concat_const_exprs(lhs.constants(), rhs.constants()); + // after the union if it is constant in both sides. + let constants = lhs + .constants() + .iter() + .filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr())) + .map(|const_expr| { + // TODO: When both sides' constants are valid across partitions, + // the union's constant should also be valid if values are + // the same. However, we do not have the capability to + // check this yet. + ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false) + }) + .collect(); // Next, calculate valid orderings for the union by searching for prefixes // in both sides. diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 42b55427fe26..671c18674acc 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1220,10 +1220,10 @@ physical_plan 01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0] 02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2 03)----UnionExec -04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] 05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] 06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true -07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] 08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] 09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true From ad4a321bebde9d364f5aca5bd6dd4eb7f247a2ff Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 18 Sep 2024 20:50:41 -0700 Subject: [PATCH 09/10] fix: create a new sort order representing the UNIONed output --- .../src/equivalence/properties.rs | 125 ++++++++++++++---- 1 file changed, 102 insertions(+), 23 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a5d54ee56cff..3d99db028f28 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1524,6 +1524,102 @@ impl Hash for ExprWrapper { } } +/// Take sort orderings for unioned sides of equal length, and return the unioned ordering. +/// +/// Example: +/// child1 = order by a0, b, c +/// child2 = order by a, b, c +/// => union's joint order is a0, a, b, c. +fn calculate_joint_ordering( + lhs: &EquivalenceProperties, + rhs: &EquivalenceProperties, +) -> LexOrdering { + let mut union_ordering = vec![]; + for ordering in lhs + .normalized_oeq_class() + .orderings + .iter() + .chain(rhs.normalized_oeq_class().orderings.iter()) + { + if union_ordering.is_empty() { + union_ordering = ordering.clone(); + continue; + } + + if !union_ordering.len().eq(&ordering.len()) { + break; + } + + let mut unioned = union_ordering.into_iter().peekable(); + let mut curr = ordering.iter().peekable(); + let mut new_union = vec![]; + loop { + match (curr.next(), unioned.next()) { + (None, None) => break, + (None, Some(u)) => { + new_union.push(u.clone()); + continue; + } + (Some(c), None) => { + new_union.push(c.clone()); + continue; + } + (Some(c), Some(u)) => { + if c.eq(&u) { + new_union.push(c.clone()); + continue; + } else if c.expr.eq(&u.expr) { + // options are different => negates each other + continue; + } else { + new_union.push(u.clone()); + new_union.push(c.clone()); + continue; + } + } + } + } + union_ordering = new_union; + } + collapse_lex_ordering(union_ordering) +} + +/// Take sort orderings for unioned sides return the shorten, novel sort order. +/// +/// Example: +/// child1 = order by a, b +/// child2 = order by a1, b1, c1 +/// => union's prefixed order is a, b. +fn calculate_prefix_ordering( + lhs: &EquivalenceProperties, + rhs: &EquivalenceProperties, +) -> Vec { + // Calculate valid orderings for the union by searching for prefixes + // in both sides. + let mut orderings = vec![]; + for mut ordering in lhs.normalized_oeq_class().orderings { + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for mut ordering in rhs.normalized_oeq_class().orderings { + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + orderings +} + /// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties` /// of `lhs` and `rhs` according to the schema of `lhs`. fn calculate_union_binary( @@ -1553,32 +1649,15 @@ fn calculate_union_binary( }) .collect(); - // Next, calculate valid orderings for the union by searching for prefixes - // in both sides. - let mut orderings = vec![]; - for mut ordering in lhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !rhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } - for mut ordering in rhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !lhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } + // Create a unioned ordering. + let mut orderings = calculate_prefix_ordering(&lhs, &rhs); + let union_ordering = calculate_joint_ordering(&lhs, &rhs); + orderings.push(union_ordering); + let mut eq_properties = EquivalenceProperties::new(lhs.schema); eq_properties.constants = constants; eq_properties.add_new_orderings(orderings); + Ok(eq_properties) } From 53e0c5649d87a9b0971823de15c06c0432e404ec Mon Sep 17 00:00:00 2001 From: wiedld Date: Wed, 18 Sep 2024 22:09:36 -0700 Subject: [PATCH 10/10] test: update tests to reflect a unioned sort order added to UNION node --- datafusion/physical-expr/src/equivalence/properties.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index 3d99db028f28..24f1f92e7ad4 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -2724,8 +2724,8 @@ mod tests { Arc::clone(&schema3), ), ], - // Expected - vec![vec!["a", "b"]], + // Expected: union sort orders + vec![vec!["a", "b", "c"]], ), // --------- TEST CASE 2 ---------- ( @@ -2799,8 +2799,8 @@ mod tests { Arc::clone(&schema3), ), ], - // Expected - vec![], + // Expected: union sort orders + vec![vec!["a", "b", "c"]], ), // --------- TEST CASE 5 ---------- (