Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimizer Sanity Checker #24

Conversation

yfy-
Copy link

@yfy- yfy- commented Jun 9, 2024

Rationale for this change

We extend the existing PipelineChecker with additional checks of order and distribution requirements. In a physical plan each plan should be pipeline friendly (previously checked by PipelineChecker) and each child plan's output order and partitioning should satisfy the parent's respective requirements. We combine these checks into the SanityCheckPlan proposed with this PR.

With the SanityCheckPlan, as the optimizer steps change and grow, we ensure that the order and distribution requirements of the final phsyical plan are always satisfied so that it can yield correct results.

What changes are included in this PR?

  1. The SanityCheckPlan step as the last step of the physical plan optimizations.
  2. SanityCheckPlan contains the former PipelineChecker, that's why PipelineChecker is deleted.

Are these changes tested?

We use the existing test cases of the former PipelineChecker. In addition following cases are added:

Using BoundedWindowAggExec

2 cases: First with the child order requirement is satisfied and another one that is not satisifed.

Using GlobalLimitExec

2 cases: First with the child distribution requirement is satisfied and another one that is not satisifed.

Using LocalLimitExec

We check when there are no requirements at all our check passes.

Using SortMergeJoinExec

3 cases: First case with both children satisify both requirements. Second case, where the second child does not satisfy the order requirement. Finally, a case where the second child does not satisfy distribution requirements.

Only includes sort reqs, docs will be added.
@github-actions github-actions bot added the core label Jun 9, 2024
let child_sort_req = child_sort_req.as_ref().unwrap();
if !child
.equivalence_properties()
.ordering_satisfy_requirement(&child_sort_req)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is the correct API (in response to your email).

let bw = bounded_window_exec("c9", sort_exprs, sort);
let sanity_checker = SanityCheckPlan::new();
let opts = ConfigOptions::default();
assert_eq!(sanity_checker.optimize(bw, &opts).is_ok(), true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally you can assert plan of the bw to make test more explicit. By this way plan will be more visible: Somethinkg like below:

let expected_plan = vec![
 // Fill Plan
];
let actual_plan = get_plan_string(&bw);
assert_eq(expected_plan, actual_plan)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added plan checks in the below commit:

aea95c6

nulls_first: false,
},
)];
let bw = bounded_window_exec("c9", sort_exprs, source);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing applies here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added plan checks in the below commit:

aea95c6

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// SanityChecker rule checks whether the order and the
// partition requirements of each execution plan is
// satisfied by its children or not.
Arc::new(SanityCheckPlan::new()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss if this rule can be merged with PipelineChecker.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, in addition to doing is_pipeline_friendly, PipelineChecker also checks a condition around SymmetricHashJoin operation. Since PipelineChecker is more enchanced (to check pipeline friendliness) should we remove the pipeline friendliness check from SanityChecker? Or to simplify, I can merge PipelineChecker into SanityChecker (and remove PipelineChecker).

Copy link
Collaborator

@mustafasrepo mustafasrepo Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we should either

  • remove pipeline friendliness check from SanityChecker (as this is done in another rule already)
  • or merge PipelineChecker into SanityChecker .

However, merging these two rules seems more appropriate. Hence, we should follow this approach.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are merged in the below commit:

d89b1c9

plan: Arc<dyn ExecutionPlan>,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
if !plan.execution_mode().pipeline_friendly() {
return plan_err!("Plan {:?} is not pipeline friendly.", plan);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pipeline friendly means the plan can be executed in bounded memory even if the source is unbounded, or the plan has a bounded source. If we want to apply this rule before PipelineChecker, we need to remove this check, or alternatively we can move this rule after PipelineChecker.

@yfy-
Copy link
Author

yfy- commented Jun 13, 2024

@mustafasrepo, about the testing strategy: So far I have picked one scenario for order requirements (using BoundedWindowAggExec), one scenario for partition requirements (GlobalLimitExec) and a scenario when there are no requirements at all (LocalLimitExec).

Should we extend this to cover all ExecutionPlan's that have some requirement, sort and/or distribution? I can add test cases for the following as well:

  • AggregateExec
  • RecursiveQueryExec
  • SymmetricHashJoinExec
  • NestedLoopJoinExec
  • SortMergeJoinExec
  • CrossJoinExec
  • HashJoinExec
  • DataSinkExec
  • WindowAggExec
  • SortExec
  • PartialSortExec
  • SortPreservingMergeExec

@mustafasrepo
Copy link
Collaborator

@mustafasrepo, about the testing strategy: So far I have picked one scenario for order requirements (using BoundedWindowAggExec), one scenario for partition requirements (GlobalLimitExec) and a scenario when there are no requirements at all (LocalLimitExec).

Should we extend this to cover all ExecutionPlan's that have some requirement, sort and/or distribution? I can add test cases for the following as well:

  • AggregateExec
  • RecursiveQueryExec
  • SymmetricHashJoinExec
  • NestedLoopJoinExec
  • SortMergeJoinExec
  • CrossJoinExec
  • HashJoinExec
  • DataSinkExec
  • WindowAggExec
  • SortExec
  • PartialSortExec
  • SortPreservingMergeExec

I think, existing test coverage is good. We do not need to use all operators in the tests. However, all the operators in the tests are single child operators. We can also add a test case for an operator with multiple children (such as SortMergeJoinExec, it has both input distribution requirement and order requirement. It will allow us to test rule with full coverage.) Once we add these test cases. I think test coverage will be in very good shape, and will be sufficient.

@mustafasrepo
Copy link
Collaborator

Also, I ran the CI tests. It seems that we have some clippy errors and test errors (I think they are related to pipeline error raised in the Sanity Checker Rule. Since error message is changed expected error message isn't same anymore. Once you merge PipelineChecker rule and Sanity Checker rule these errors will be fixed as far as I can tell).
Make sure to run command
ulimit -S -n 2048 cargo test (for mac)
cargo test (for other systems)
in the .git main directory to see whether all tests pass. This test may take 3-5 minutes.

Also, CI tests has additional check whether code follows guidelines. For this check, we have the script bash dev/rust_lint.sh
This script checks the style of the codebase. It also guides you, how to conform to the guide. Once bash dev/rust_lint.sh, and ulimit -S -n 2048 cargo test checks complete successfully. CI tests will pass 99% of the time.

If you encounter any issues during these stages, please notify me. I can help you out.

@yfy-
Copy link
Author

yfy- commented Jun 17, 2024

@mustafasrepo, about the testing strategy: So far I have picked one scenario for order requirements (using BoundedWindowAggExec), one scenario for partition requirements (GlobalLimitExec) and a scenario when there are no requirements at all (LocalLimitExec).
Should we extend this to cover all ExecutionPlan's that have some requirement, sort and/or distribution? I can add test cases for the following as well:

  • AggregateExec
  • RecursiveQueryExec
  • SymmetricHashJoinExec
  • NestedLoopJoinExec
  • SortMergeJoinExec
  • CrossJoinExec
  • HashJoinExec
  • DataSinkExec
  • WindowAggExec
  • SortExec
  • PartialSortExec
  • SortPreservingMergeExec

I think, existing test coverage is good. We do not need to use all operators in the tests. However, all the operators in the tests are single child operators. We can also add a test case for an operator with multiple children (such as SortMergeJoinExec, it has both input distribution requirement and order requirement. It will allow us to test rule with full coverage.) Once we add these test cases. I think test coverage will be in very good shape, and will be sufficient.

I have added test using SortMergeJoinExec in the below commit:

49aba4b

@yfy-
Copy link
Author

yfy- commented Jun 17, 2024

Also, I ran the CI tests. It seems that we have some clippy errors and test errors (I think they are related to pipeline error raised in the Sanity Checker Rule. Since error message is changed expected error message isn't same anymore. Once you merge PipelineChecker rule and Sanity Checker rule these errors will be fixed as far as I can tell). Make sure to run command ulimit -S -n 2048 cargo test (for mac) cargo test (for other systems) in the .git main directory to see whether all tests pass. This test may take 3-5 minutes.

Also, CI tests has additional check whether code follows guidelines. For this check, we have the script bash dev/rust_lint.sh This script checks the style of the codebase. It also guides you, how to conform to the guide. Once bash dev/rust_lint.sh, and ulimit -S -n 2048 cargo test checks complete successfully. CI tests will pass 99% of the time.

If you encounter any issues during these stages, please notify me. I can help you out.

Thanks @mustafasrepo, I have fixed clippy errors and some other test cases. However, there is one test case that does not finish: fifo::unix_test::unbounded_file_with_symmetric_join. The query defined there gets stuck at physical plan optimization stage and it hits the check where we look at the distribution requirements in SanityCheckPlan. When I remove the distribution requirements check the tests execute successfully. When I examined that physical plan I observed SymmetricHashJoinExec requires its 2 children to be hash partitioned on join columns, but its 2 children have UndefinedDistribution. Somehow, SanityCheckPlan fails and it causes the query optimization to execute indefinitely (at least on my machine). I am further examining why this would happen.

@mustafasrepo
Copy link
Collaborator

Thanks @mustafasrepo, I have fixed clippy errors and some other test cases. However, there is one test case that does not finish: fifo::unix_test::unbounded_file_with_symmetric_join. The query defined there gets stuck at physical plan optimization stage and it hits the check where we look at the distribution requirements in SanityCheckPlan. When I remove the distribution requirements check the tests execute successfully. When I examined that physical plan I observed SymmetricHashJoinExec requires its 2 children to be hash partitioned on join columns, but its 2 children have UndefinedDistribution. Somehow, SanityCheckPlan fails and it causes the query optimization to execute indefinitely (at least on my machine). I am further examining why this would happen.

These problems do not seem to be related with your work. Hence, I also debugged them. It seems that we have two problems

  • fifo::unix_test::unbounded_file_with_symmetric_join test stucks when error is returned. (it has some tasks running. those tasks couldn't be joined when error is returned. Then test doesn't finish.)
  • Partitioning UnknownDistribution(1) doesn't satisfy requirement HashRequirement(vec![expr1, ..]). This requirement is actually satisfied (The root cause of the error).

I sent a commit to fix these problems. Also there are some other tests failing with this check. After examining them, I have found that rule actually help us find invalid plans. In the commit I also fixed them. However, there are still some clippy errors (If you do not have those errors. They come with latest rust updates. You can do rustup update or corresponding command in your local to bring latest stable changes.) Also as far as I remember those clippy errors are already resolved in the upstream. Hence I recommend you to merge your branch with apache_main before resolving any clippy issues. They might already be solved.

@@ -675,7 +675,7 @@ impl ExecutionPlan for AggregateExec {
vec![Distribution::UnspecifiedDistribution]
}
AggregateMode::FinalPartitioned | AggregateMode::SinglePartitioned => {
vec![Distribution::HashPartitioned(self.output_group_expr())]
vec![Distribution::HashPartitioned(self.group_by.input_exprs())]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have required input expressions. Rule helped us to catch this bug.

Arc::new(MemTable::try_new(Arc::new(table.schema.clone()), vec![])?),
Arc::new(MemTable::try_new(
Arc::new(table.schema.clone()),
vec![vec![]],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the source, we were generating empty partitions (0 partition). This is invalid. Rule helped us to catch this bug.

@yfy-
Copy link
Author

yfy- commented Jun 21, 2024

Thanks @mustafasrepo, I have fixed clippy errors and some other test cases. However, there is one test case that does not finish: fifo::unix_test::unbounded_file_with_symmetric_join. The query defined there gets stuck at physical plan optimization stage and it hits the check where we look at the distribution requirements in SanityCheckPlan. When I remove the distribution requirements check the tests execute successfully. When I examined that physical plan I observed SymmetricHashJoinExec requires its 2 children to be hash partitioned on join columns, but its 2 children have UndefinedDistribution. Somehow, SanityCheckPlan fails and it causes the query optimization to execute indefinitely (at least on my machine). I am further examining why this would happen.

These problems do not seem to be related with your work. Hence, I also debugged them. It seems that we have two problems

  • fifo::unix_test::unbounded_file_with_symmetric_join test stucks when error is returned. (it has some tasks running. those tasks couldn't be joined when error is returned. Then test doesn't finish.)
  • Partitioning UnknownDistribution(1) doesn't satisfy requirement HashRequirement(vec![expr1, ..]). This requirement is actually satisfied (The root cause of the error).

I sent a commit to fix these problems. Also there are some other tests failing with this check. After examining them, I have found that rule actually help us find invalid plans. In the commit I also fixed them. However, there are still some clippy errors (If you do not have those errors. They come with latest rust updates. You can do rustup update or corresponding command in your local to bring latest stable changes.) Also as far as I remember those clippy errors are already resolved in the upstream. Hence I recommend you to merge your branch with apache_main before resolving any clippy issues. They might already be solved.

I have merged apache_main to the branch and also ran rustup update, now clippy errors seem to be gone. However, there are other test cases that are failing. These come from bin/sqllogictests.rs. From what I could get there is a problem with queries like:

SELECT 1 num UNION ALL SELECT 2 num ORDER BY num;

This yields the physical plan:

SortPreservingMergeExec: [num@0 ASC NULLS LAST]
  UnionExec
    ProjectionExec: expr=[1 as num]
      PlaceholderRowExec
    ProjectionExec: expr=[2 as num]
      PlaceholderRowExec

Order requirement for the SortPreservingMergeExec is not satisifed.

@mustafasrepo
Copy link
Collaborator

I have merged apache_main to the branch and also ran rustup update, now clippy errors seem to be gone. However, there are other test cases that are failing.

@yfy- It seems that this failure is another case this check helped us catch a bug. I will fix this problem. I will let you know once this problem is solved.

@mustafasrepo
Copy link
Collaborator

@yfy- I resolved the problems mentioned above in the PR. We may either merge that PR into your PR, or continue as separate PRs. (We will decide after internal discussion.) With the changes in that PR, all test pass in my local. Hence, for now we can call this PR complete in terms of functionality. However, I will do one last review of this PR tomorrow to make sure we are not missing anything. Kind regards

@mustafasrepo
Copy link
Collaborator

mustafasrepo commented Jun 27, 2024

@yfy- this PR is ready to go upstream. 🎉 🎉 🚀

@mustafasrepo mustafasrepo mentioned this pull request Jun 28, 2024
Copy link
Collaborator

@ozankabak ozankabak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments to address, but LGTM in general. Great work @yfy-, please coordinate with @mustafasrepo to send/merge upstream after addressing the comments.

datafusion/core/src/physical_optimizer/optimizer.rs Outdated Show resolved Hide resolved
datafusion/physical-expr/src/equivalence/properties.rs Outdated Show resolved Hide resolved
datafusion/physical-plan/src/union.rs Show resolved Hide resolved
datafusion/core/src/physical_optimizer/sanity_checker.rs Outdated Show resolved Hide resolved
datafusion/core/src/physical_optimizer/sanity_checker.rs Outdated Show resolved Hide resolved
@mustafasrepo mustafasrepo marked this pull request as ready for review July 1, 2024 10:35
@mustafasrepo
Copy link
Collaborator

@yfy- final reviews are addressed now. You can open this PR to upstream repo 🚀 🚀 .

@mustafasrepo mustafasrepo changed the base branch from apache_main to feature/optimizer_sanity_checker July 1, 2024 15:06
@mustafasrepo mustafasrepo merged commit 0c2eb45 into synnada-ai:feature/optimizer_sanity_checker Jul 1, 2024
24 checks passed
mustafasrepo pushed a commit that referenced this pull request Aug 9, 2024
…calculations, limit/order/distinct (apache#11756)

* Fix unparser derived table with columns include calculations, limit/order/distinct (#24)

* compare format output to make sure the two level of projects match

* add method to find inner projection that could be nested under limit/order/distinct

* use format! for matching in unparser sort optimization too

* refactor

* use to_string and also put comments in

* clippy

* fix unparser derived table contains cast (#25)

* fix unparser derived table contains cast

* remove dbg
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants