Skip to content

Conversation

@mustafasrepo
Copy link
Contributor

Which issue does this PR close?

N/A

Rationale for this change

As some of you may know, we have been (and still are) collaborating with @mingmwang on a top-down refactoring of the EnforceSorting rule. During our quest to find the simplest implementation that still offers full functionality, we observed that bottom-up and top-down approaches have their pros and cons. A bottom-up approach is better avoiding pipeline breakages, while a top-down approach results in simpler code and is able to perform certain push down operations its bottom-up sibling can not.

It is still not entirely clear whether we can achieve full functionality in a pure top-down approach, but in the interim, we can combine the two approaches to offer full functionality AND implement a test suite to guide our future efforts.

What changes are included in this PR?

This PR implements a hybrid top-down/bottom-up sort optimization rule by utilizing the existing bottom-up approach and the top-down approach implemented by @mingmwang . With this hybrid rule, all test plans either improve or stay the same. We are also checking in a much larger test suite verifying all sorts of improvements like union pushdowns etc. We think the new test suite will serve as a very useful baseline for future improvements on the EnforceSorting rule (including our ongoing collaboration on finding whether a pure top-down approach is possible).

Are these changes tested?

New unit tests are added. Existing tests also pass.

Are there any user-facing changes?

No.

@ozankabak
Copy link
Contributor

ozankabak commented Mar 28, 2023

No worries, take your time. I will send the type aliases in a follow on PR. @mustafasrepo, can you make a new PR for (1)? Let's get that merged and this PR will be (2) -- and decrease in size.

We will also try the strategy you suggested for the upcoming feature PRs.

@mustafasrepo
Copy link
Contributor Author

I suggest we split this PR into two

  1. The changes to the type signature of ExecutionPlan which while large is mostly mechanical
  2. The algorithm changes to datafusion/core/src/physical_optimizer/sort_enforcement.rs and datafusion/core/src/physical_optimizer/sort_pushdown.rs

I think we can get 1 in sooner -- I think after @mingmwang 's review of the algorithm we'll be ready for 2.

I have moved the changes related signature of ExecutionPlan to another PR. New PR can be found in #5772. Thanks @alamb for the suggestion.

@mingmwang
Copy link
Contributor

@mustafasrepo
Could you please add the global sort flag back in the plan Display? I think it is useful.

@mustafasrepo
Copy link
Contributor Author

@mustafasrepo Could you please add the global sort flag back in the plan Display? I think it is useful.

Since existing tests also changes with this change, it increases diff a bit. If it is OK for you, I can file another Pr for this change. I guess it will be merged faster that way also. Like we did in #5772. What do you think?

@ozankabak
Copy link
Contributor

ozankabak commented Mar 29, 2023

@mustafasrepo Could you please add the global sort flag back in the plan Display? I think it is useful.

Yes, let's do it in a follow on PR after this, making only that change (with no other test changes) for ease of review and avoiding simple mistakes.

@mingmwang
Copy link
Contributor

@mustafasrepo Could you please add the global sort flag back in the plan Display? I think it is useful.

Yes, let's do it in a follow on PR after this, making only that change (with no other test changes) for ease of review and avoiding simple mistakes.

Agree.

# Conflicts:
#	datafusion/core/src/physical_optimizer/sort_enforcement.rs
#	datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
#	datafusion/core/src/physical_plan/planner.rs
#	datafusion/core/tests/window_fuzz.rs
#	datafusion/physical-expr/src/sort_expr.rs
#	datafusion/physical-expr/src/utils.rs
@ozankabak
Copy link
Contributor

@alamb or @mingmwang: Given that the API PR has merged, let us know when you are comfortable to go ahead with this. After this merges, we will create two follow-ons: (1) fixing the type aliases, (2) printing global sorting information.

@alamb
Copy link
Contributor

alamb commented Apr 1, 2023

Thanks for the ping @ozankabak -- I will try and find time to review this PR carefully tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through the plan changes carefully -- they look reasonable to me. It feels like it would be good to make a physical "analyzer" pass (analogous to what is being done to the logical plans) to make the split between "update the plan to be correct" and "make the plan faster"

Thank you for sticking with this one @mustafasrepo -- it looks like a real improvement to me (and the new tests are 👌 )

fn maintains_input_order(&self) -> Vec<bool> {
match self.join_type {
JoinType::Inner => vec![true, true],
JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![true, false],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the false here is because NULLs can be injected into the stream arbitrarily, right, messing up the sort order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return None;
}
let is_spm = is_sort_preserving_merge(plan);
let is_union = plan.as_any().is::<UnionExec>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of separting the "enforce sort requirements" from the "optimize sort requirements" which I think this PR improves

"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: expr=[nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" UnionExec",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plan seems to have gone "in reverse" (as in the test says that the sort should be removed) -- is that ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all looks good here. We move sorts down whenever doing so can result in sorting smaller datasets. In this case, having the sort above UnionExec or below doesn't matter in that regard, and the default tendency is to push sorts down in such cases.

let physical_plan =
bounded_window_exec("non_nullable_col", sort_exprs1, window_agg2);

let expected_input = vec![
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of these tests must have taken substantial amounts of time to write and updated the expected output.

One thing we started doing in IOx was to use https://insta.rs/ (where basically you can do cargo insta review and the tool will update the inlined expected output for you). I don't have time to lead another refactoring project now (I am still trying to finish up sqllogictests) but using insta might help accelerate velocity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer, we will take a look at it 👍

@mingmwang
Copy link
Contributor

mingmwang commented Apr 2, 2023

@mustafasrepo @ozankabak
My original plan was try to leverage the top-down process to achieve two goals:

  • Simply the optimization process, make it easily to understand
  • Can do all the sort optimizations in one pass,

But when I start the implementation and testing I realize that it is quite hard to achieve the two goals at same time. I think the new implementation is the hybrid of bottom-up and top-down approach but it is more complex.
I accept that in the current phase, we combine the two approaches to offer full functionality. But in future I think we should still pursue the goal to offer the full functionality in a pure top-down approach.

@ozankabak
Copy link
Contributor

But in future I think we should still pursue the goal to offer the full functionality in a pure top-down approach.

Agreed, let's keep on collaborating to figure out how we can accomplish that. Now that we have extensive tests, it should be easier to try our hand at doing that

Comment on lines 77 to 83

/// Checks whether the given executor is a limit;
/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest avoid using the word executor, maybe you can use input plan or input operator instead.
Executor usually refers to different things, for example in Flink it is the TaskManager, in Spark or Ballista it is the Worker who runs the tasks(physical plan fragments).

@mingmwang
Copy link
Contributor

Overall, this PR LGTM.
@alamb @ozankabak Could you please leave it for one more day before merge this ?

@ozankabak
Copy link
Contributor

Sure! Thanks for reviewing.

Comment on lines +360 to +368
/// This function maps back requirement after ProjectionExec
/// to the Executor for its input.
// Specifically, `ProjectionExec` changes index of `Column`s in the schema of its input executor.
// This function changes requirement given according to ProjectionExec schema to the requirement
// according to schema of input executor to the ProjectionExec.
// For instance, Column{"a", 0} would turn to Column{"a", 1}. Please note that this function assumes that
// name of the Column is unique. If we have a requirement such that Column{"a", 0}, Column{"a", 1}.
// This function will produce incorrect result (It will only emit single Column as a result).
pub fn map_columns_before_projection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment !

Ok(VisitRecursion::Continue)
}

fn map_children<F>(mut self, transform: F) -> Result<Self>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original self in map_children is immutable, why it was changed to be mutable here ?

fn map_children<F>(self, transform: F) -> Result<Self>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a mut binding enables us to reuse the same object with a simple attribute assignment without creating a new object.

Comment on lines 141 to 155
// Since new_plan is a SortExec, we can safely get the 0th index.
let child = &new_plan.children()[0];
if let Some(adjusted) =
pushdown_requirement_to_children(child, required_ordering.as_deref())?
{
// Can push down requirements
Ok(Transformed::Yes(SortPushDown {
plan: child.clone(),
required_ordering,
adjusted_request_ordering: adjusted,
}))
} else {
// Can not push down requirements
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have one question for this logic, since the parent requirements are already satisfied by modifying the SortExec, why you need to push down the requirements here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you are right. No need for it. Changed it to None.

{
// If the current plan is a leaf node or can not maintain any of the input ordering, can not pushed down requirements.
// For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
// For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate comment here.

// For RepartitionExec, we always choose to not push down the sort requirements even the RepartitionExec(input_partition=1) could maintain input ordering

plan: &Arc<dyn ExecutionPlan>,
parent_required: Option<&[PhysicalSortRequirement]>,
) -> Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>> {
const ERR_MSG: &str = "Expects parent requirement to contain something";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This structure is not easy to read and understand. I think we may need some future refactoring on this.

Result<Option<Vec<Option<Vec<PhysicalSortRequirement>>>>>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be somewhat simplified when we use the type aliases we tested above. But I agree that we will probably simplify/refactor this code over time

@mingmwang
Copy link
Contributor

@ozankabak @alamb
Please go ahead and merge this PR.

@ozankabak
Copy link
Contributor

@alamb, we addressed the reviews and this is good to go from our perspective

@alamb
Copy link
Contributor

alamb commented Apr 4, 2023

Thanks again everyone!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate core Core DataFusion crate physical-expr Changes to the physical-expr crates sql SQL Planner

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants