Skip to content

Conversation

@mingmwang
Copy link
Contributor

@mingmwang mingmwang commented Feb 15, 2023

Which issue does this PR close?

Closes #5289.

Rationale for this change

Reimplement the EnforceSorting rule in a Top-Down approach to add/remove Sort when unnecessary
The new implementation does not need to keep the lineage information and record the position SortExec in another tree structure. The Sort removing and adding is driven by the properties and requirements.

What changes are included in this PR?

  • Sort removing, Sort is removed in two cases:
  1. The Sort is redundant, the input of the Sort already has a finer ordering than this Sort enforces.
  2. The Sort does not impact the final result ordering, some operators like RepartitionExec can not maintain the input ordering so that the Sort in its descendants can be removed
  • Sort adding, Sort is added to satisfy the ordering requirements
    Ordering requirements is pushed down and propagated from the top node to its children and descendants. The basic process is:
  1. The parent requirements are already satisfied, do not add Sort, generate new requirements if the current node itself has sort requirements to its input(required_input_ordering). For UnionExec, also generated new requirements based on UnionExec's output ordering properties to keep the main input ordering semantics correct and trim the unnecessary sort columns.
  2. The parent requirements are not satisfied and can not push down, add Sort.
  3. The parent requirements are not satisfied, the current node itself does not have its own sort requirements to its input, push down the sort requirements.
  4. The parent requirements are not satisfied, the current node(like SortMergeJoinExec, WindowAggExec, SortPreservingMergeExec, etc) itself has its own sort requirements to its input. Check the compatibility of the parent requirements with its own sort requirements.
    a) If the required input ordering is more specific, do not push down the parent requirements, keep everything unchanged.
    b) If the the parent requirements are more specific, push down the parent requirements
    c) If they are not compatible, add Sort, generate new requirements from required input ordering.
  • Global Sort optimization
    This is achieved by the combination of GlobalSortSelection, EnforceDistribution and EnforceSorting.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-expr Changes to the physical-expr crates sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate labels Feb 15, 2023
@mingmwang
Copy link
Contributor Author

@mustafasrepo @ozankabak @yahoNanJing
Please help to take a look.

@ozankabak
Copy link
Contributor

Thank you, we will digest this in the next several days and leave comments as we make progress.

@ozankabak
Copy link
Contributor

ozankabak commented Feb 16, 2023

The PR is very large in scope. It changes parts of the old code (and certainly makes some changes to its tests), and also adds new code (and new tests). It would be much easier to review this if it were broken down to two PRs, where the first one only replicates the current functionality, has no functionality regressions, and does not change any tests at all; with the second PR adding new functionality. Right now, the new rule is significantly longer than the old rule (which is bad), but it offers more functionality (which is great). So is switching from bottom-up to top-down a good change or a bad change? We can't tell easily.

Now, let me share my (very) preliminary impression so far after a cursory look: I see that it has better handling of sort preserving merges, smarter push-down of sorts under unions, and adds support for sort merge joins. These are the good bits. The cons are that it seems to lose partition awareness (though I'm not sure about this yet) and it seems to regress on some cases where it was doing better before. I think at least some of these are due to the presumption that there is a global output ordering to preserve, and I am not sure I agree with that.

Anyway, we will disentangle and review in detail, but I want to give you a heads up that this will take some time. We will need to analyze every case carefully, go back to the old version of the code (and tests), compare and contrast etc. Before we form an idea on the merits of bottom-up vs. top-down, our goal will be to create two functionally equal implementations passing exactly the same test suite. Without that, it is not possible to objectively decide.

Whatever the result on bottom-up vs. top-down is, I think this exercise will end up making the rule better, so that's great 🚀 I will keep you posted as we make progress in the upcoming days.

@mingmwang
Copy link
Contributor Author

mingmwang commented Feb 16, 2023

The PR is very large in scope. It changes parts of the old code (and certainly makes some changes to its tests), and also adds new code (and new tests). It would be much easier to review this if it were broken down to two PRs, where the first one only replicates the current functionality, has no functionality regressions, and does not change any tests at all; with the second PR adding new functionality. Right now, the new rule is significantly longer than the old rule (which is bad), but it offers more functionality (which is great). So is switching from bottom-up to top-down a good change or a bad change? We can't tell easily.

Now, let me share my (very) preliminary impression so far after a cursory look: I see that it has better handling of sort preserving merges, smarter push-down of sorts under unions, and adds support for sort merge joins. These are the good bits. The cons are that it seems to lose partition awareness (though I'm not sure about this yet) and it seems to regress on some cases where it was doing better before. I think at least some of these are due to the presumption that there is a global output ordering to preserve, and I am not sure I agree with that.

Anyway, we will disentangle and review in detail, but I want to give you a heads up that this will take some time. We will need to analyze every case carefully, go back to the old version of the code (and tests), compare and contrast etc. Before we form an idea on the merits of bottom-up vs. top-down, our goal will be to create two functionally equal implementations passing exactly the same test suite. Without that, it is not possible to objectively decide.

Whatever the result on bottom-up vs. top-down is, I think this exercise will end up making the rule better, so that's great 🚀 I will keep you posted as we make progress in the upcoming days.

Sure, please take your time and I will add more comments to the code to explain the rule process.
The new rule looks significantly longer than the original one is because of handling the propagating of sort requirements down. But I think the sort removing/adding procedure is very clear and the property/requirement driven framework is more powerful. It can effectively handling below case and figure out an optimal plan without adding Sort and then removing Sort back and forth.

Required('a', 'b', 'c')
   Required('a', 'b')
      Required('a')

We can leverage the same framework to do more advanced optimizations like re-ordering(PostgreSQL has this optimization) the multiple window execs in the plan tree and further reduce the number of Sorts. Generally I think the Top-Down based approach is more easy and straightforward to collect and propagate necessary properties and find the global optimal plan.

Required/Order by ('a', 'b', 'c')
   WindowExec1 Required('a', 'b', 'c')
      WindowExec2 Required('x', 'y', 'z')
          Order by ('x', 'y', 'z')

Some UT results are changed. Yes, I think the major arguing point is whether we should preserve output ordering during optimization process or we can trim the unnecessary sort columns.
As I know, SparkSQL preserves the output ordering(SparkSQL does not do very sophisticated sort optimizations), PostgreSQL sometimes preserves the output ordering but sometime not(I guess this is decided by the top/parent operators, if they are ordering sensitive, but I'm not sure).
For DataFusion, my preference is since we alway define the maintains_input_order() method for physical plan nodes, if it is true, we should preserve output ordering and should not trim or reverse output ordering, otherwise maintains_input_order() is meaningless and very confusing.

There are some other UT result changes, I am not sure whether they are due to the original bug or the new rule introduced regression, need to double confirm with you and check carefully, especially this one test_window_agg_complex_plan.

@mingmwang
Copy link
Contributor Author

Regarding the WindowExec/Window expression reverse, do we support the reverse for all the built-in window functions?
For example for ROW_NUMBER() OVER , I think we should not allow the reverse, but I do not find a place to check or defining the allowed function list.

@ozankabak
Copy link
Contributor

Not all built-in window functions are reversible. There is an indicator in the API called get_reverse_expr in the WindowExpr trait, which returns None if there is no equivalent reverse. For built-ins, this function calls reverse_expr, whose default value is None. Functions like LEAD and LAG override this to indicate reversibility, but ROW_NUMBER doesn't.

@mingmwang
Copy link
Contributor Author

Some future work:

  • Support reordering multiple window expressions.
  • required_input_ordering() should remove the duplicate sort keys and remove the equal columns in the same EquivalenceProperties
  • Introducing FD(functional dependencies) to further avoid the unnecessary Sorts and Repartitions

@ozankabak
Copy link
Contributor

We started the work to get the two approaches to a comparable state, @mustafasrepo is actively working on it. We will post more updates as we make progress.

Comment on lines 430 to 435
if prop.sort_options.is_some() {
PhysicalSortExpr {
expr: prop.expr.clone(),
options: prop.sort_options.unwrap(),
}
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use If let Some(sort_options) = prop.sort_options idiom here. This would remove .unwrap()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

let impact_result_ordering = plan.output_ordering().is_some()
|| plan.output_partitioning().partition_count() <= 1
|| plan.as_any().downcast_ref::<GlobalLimitExec>().is_some()
|| plan.as_any().downcast_ref::<LocalLimitExec>().is_some();
Copy link
Contributor

@mustafasrepo mustafasrepo Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patterns in the form: plan.as_any().downcast_ref::<ExecName>().is_some() can be converted to plan.as_any().is::<ExecName>().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

Comment on lines 151 to 159
.plan
.as_any()
.downcast_ref::<GlobalLimitExec>()
.is_some()
|| self
.plan
.as_any()
.downcast_ref::<LocalLimitExec>()
.is_some()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment above applies here. Also you can construct a util function is_limit to check for Exec is GlobalLimitExec or LocalLimitExec. This util can be used for the check at the PlanWithSortRequirements::init.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 269 to 270
// SortPreservingMergeExec + SortExec(local/global) is the same as the global SortExec
// Remove unnecessary SortPreservingMergeExec + SortExec(local/global)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of SortPreservingMergeExec + SortExec(local/global), you can use SortExec(local/global) -> SortPreservingMergeExec. Second reflects hierarchy better. I think.

Comment on lines 315 to 320
} else if plan.as_any().downcast_ref::<WindowAggExec>().is_some()
|| plan
.as_any()
.downcast_ref::<BoundedWindowAggExec>()
.is_some()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can write a util function is_window to check for whether executor is WindowAggExec or BoundedWindowAggExec. There are couple of place this check is done. I think it will be handy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Comment on lines 791 to 796
let flags = window_plan
.children()
.into_iter()
.map(|child| {
// If the child is leaf node, check the output ordering
if child.children().is_empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this state we know that executor is window. It has exactly one child. Hence I think we can remove this iteration. We can just use body of map, where child is window_plan.children()[0].clone().

Comment on lines 819 to 827
if requirements_compatible(
top_requirement,
child.required_input_ordering()[0].as_deref(),
|| child.equivalence_properties(),
) || requirements_compatible(
child.required_input_ordering()[0].as_deref(),
top_requirement,
|| child.equivalence_properties(),
) || requirements_compatible(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is used a lot. I think you can construct a util to check whether one of the requirements satisfy other (where function does this check). That would be better I think.

Comment on lines 906 to 908
fn extract_window_info_from_plan(
plan: &Arc<dyn ExecutionPlan>,
) -> Option<WindowExecInfo> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function can return Result<WindowExecInfo>. This change would remove some the unwraps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

Comment on lines 74 to 76
if requirement.sort_options.is_some() {
self.options == requirement.sort_options.unwrap()
&& self.expr.eq(&requirement.expr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use If let Some() idiom here. this would remove .unwrap() in the body.

@mustafasrepo
Copy link
Contributor

@mingmwang I went through this PR in detail. Thanks for the great work. Let me summarize my observations.

The Top-Down approach is better at pushing down SortExec s if it is helpful. However, the Bottom-up approach is better at producing pipeline-friendly plans when it is possible.

Since some tests improve and some regress with this change. I have constructed a unified test bench for these rules (I use union of the tests for SortEnforcement and TopDownSortEnforcement). The test bench can be found in the PR. PR body also includes the performance comparison of both rules on the unified test bench. I think we can reach to a rule where all tests in the test bench pass. I will try to do so myself. I encourage you to try it also.

In the meantime, This PR has nice changes that do not need to wait for final rule version. Specifically, printing global flag in SortExec and api change
from fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>>
to fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirements>>> don't need to wait. I think you can file another PR with these changes.

@mingmwang
Copy link
Contributor Author

@mingmwang I went through this PR in detail. Thanks for the great work. Let me summarize my observations.

The Top-Down approach is better at pushing down SortExec s if it is helpful. However, the Bottom-up approach is better at producing pipeline-friendly plans when it is possible.

Since some tests improve and some regress with this change. I have constructed a unified test bench for these rules (I use union of the tests for SortEnforcement and TopDownSortEnforcement). The test bench can be found in the PR. PR body also includes the performance comparison of both rules on the unified test bench. I think we can reach to a rule where all tests in the test bench pass. I will try to do so myself. I encourage you to try it also.

In the meantime, This PR has nice changes that do not need to wait for final rule version. Specifically, printing global flag in SortExec and api change from fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> to fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirements>>> don't need to wait. I think you can file another PR with these changes.

Thanks a lot, I will take a closer look at your comparing result tomorrow. I guess most of the failure case in Top-Down rule is because the current implementation try to keep the final output ordering and will not remove all the SortExec very aggressively. I will also resolve all the review comments tomorrow. Regarding split the PR, I do want to split it into two, because in this PR actually besides the new rule implementation and the added new tests, other changes are very tiny.

@mingmwang
Copy link
Contributor Author

Test name TopDown Rule Bottom Up Rule Desc
test_remove_unnecessary_sort4 PASS FAIL Two Down result is better, removed unnecessary SortPreservingMergeExec
test_remove_unnecessary_sort6 PASS FAIL
test_union_inputs_different_sorted2 PASS FAIL Actually the results are the same, just the plan shape is different
test_union_inputs_different_sorted4 PASS FAIL Same as above
test_union_inputs_different_sorted6 PASS FAIL Two Down result is better
test_sort_merge_join_order_by_left PASS FAIL
test_sort_merge_join_order_by_right PASS FAIL
test_not_remove_top_sort_window_multilayer FAIL PASS Two Down rule does not remove the Sort due to preserve ordering(maintains_input_order)
test_multiple_sort_window_exec PASS FAIL
test_window_multi_path_sort FAIL PASS Two Down rule does not remove the Sort due to preserve ordering(maintains_input_order)
test_multilayer_coalesce_partitions FAIL PASS In the Top down rule, global Sort optimization is achieved by the combination of GlobalSortSelection, EnforceDistribution and EnforceSorting now
test_coalesce_propagate FAIL PASS Same as above
test_commutativity FAIL PASS Same as above

@mingmwang
Copy link
Contributor Author

@mustafasrepo @ozankabak
After a closer look at the test comparing result, I think the differences mainly relate to preserve ordering(maintains_input_order). I had highlighted the two test cases.
From my point of view, the rules should not remove those Sorts, the rules should respect the maintains_input_order() and decide what sorts can be removed or not, even sometimes those Sort are better than necessary.

And In the TopDown rule implementation, I can add another configuration flag to enable removing Sorts more aggressively and achieve the same results as the the Bottom-up rule, but we still need to define clearly in which cases Sort can be removed.

@ozankabak
Copy link
Contributor

And In the TopDown rule implementation, I can add another configuration flag to enable removing Sorts more aggressively and achieve the same results as the the Bottom-up rule, but we still need to define clearly in which cases Sort can be removed.

This would be great, thank you. We will perform one more pass of analysis with that and get clarity on what else is left (if any).

Comment on lines 2134 to 2135
"SortPreservingMergeExec: [c1@0 ASC NULLS LAST]",
" SortExec: expr=[c1@0 ASC NULLS LAST], global=false",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that final SortPreservingMergeExec in the plan is redundant. Its input has already single partition. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you. It is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still some optimization room in the TopDown rule regarding the window expr ordering and global ordering.

@ozankabak
Copy link
Contributor

FYI @mingmwang, we created an implementation sketch combining bottom-up and top-down approaches, which passes the whole (unified) test set. You can find it here. We created this so that it can give us a baseline/goal in our quest to achieve full functionality with the simplest design possible.

When you add the global ordering/window expr ordering functionality, we will do one more analysis and comparison with that baseline and give more feedback to you.

@mingmwang
Copy link
Contributor Author

FYI @mingmwang, we created an implementation sketch combining bottom-up and top-down approaches, which passes the whole (unified) test set. You can find it here. We created this so that it can give us a baseline/goal in our quest to achieve full functionality with the simplest design possible.

When you add the global ordering/window expr ordering functionality, we will do one more analysis and comparison with that baseline and give more feedback to you.

Sure, I will take a closer look today and tomorrow.

@alamb
Copy link
Contributor

alamb commented Mar 4, 2023

What is the status of this PR (it looks like it may be waiting to address some more feedback). Shall we mark it as "draft" as we work on it (I am trying to make sure all PRs marked as "ready for review" are actually waiting on review)

@ozankabak
Copy link
Contributor

What is the status of this PR (it looks like it may be waiting to address some more feedback). Shall we mark it as "draft" as we work on it (I am trying to make sure all PRs marked as "ready for review" are actually waiting on review)

Sounds good to me, we will let you know when we converge

@alamb alamb marked this pull request as draft March 4, 2023 21:38
@ozankabak
Copy link
Contributor

ozankabak commented Mar 11, 2023

@mingmwang, it seems you are busy these days. I think it might be a good idea to create a PR to get the new/extended test suite and a base (passing) implementation in.

Concurrently, we can continue collaborating on this PR to arrive at a more elegant design that passes the new/extended test suite and replace the base implementation. @mustafasrepo worked out the base impl and test suite already, so it will be easy for us to clean it up and get it in quickly. Sounds good?

@mingmwang
Copy link
Contributor Author

Just close this PR.

@mingmwang mingmwang closed this Apr 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-expr Changes to the physical-expr crates sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

TopDown EnforceSorting implementation

4 participants