-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44040][SQL] Fix compute stats when AggregateExec node above QueryStageExec #41576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for pinging me, @wangyum . |
| } | ||
|
|
||
| override def computeStats(): Statistics = { | ||
| // TODO this is not accurate when there is other physical nodes above QueryStageExec. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the PR description, we have still issues for the other physical nodes?
Fix data issue. OptimizeOneRowPlan will use stats to remove Aggregate:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've checked other physical nodes and none of them seem to have this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, shall we remove this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keep it, new physical nodes may be added in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment should be still valid for aggregation with non-empty query stage as this pr only changes the empty case.
ulysses-you
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @wangyum for the fix, looks correct to me
|
@dongjoon-hyun @HyukjinKwon Let's merge this fix for Spark 3.4.1 release? |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @wangyum .
|
Feel free to merge this PR, @wangyum . |
…eryStageExec ### What changes were proposed in this pull request? This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`. For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example: ```sql SELECT count(*) FROM tbl WHERE false; ``` The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891). ### Why are the changes needed? Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan === !Aggregate [id#5L], [id#5L] Project [id#5L] +- Union false, false +- Union false, false :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #41576 from wangyum/SPARK-44040. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 55ba63c) Signed-off-by: Yuming Wang <[email protected]>
…eryStageExec ### What changes were proposed in this pull request? This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`. For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example: ```sql SELECT count(*) FROM tbl WHERE false; ``` The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891). ### Why are the changes needed? Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan === !Aggregate [id#5L], [id#5L] Project [id#5L] +- Union false, false +- Union false, false :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #41576 from wangyum/SPARK-44040. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 55ba63c) Signed-off-by: Yuming Wang <[email protected]>
|
Merged to master, branch-3.4 and branch-3.3. |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM2
…eryStageExec ### What changes were proposed in this pull request? This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`. For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example: ```sql SELECT count(*) FROM tbl WHERE false; ``` The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891). ### Why are the changes needed? Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan === !Aggregate [id#5L], [id#5L] Project [id#5L] +- Union false, false +- Union false, false :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#41576 from wangyum/SPARK-44040. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
…eryStageExec ### What changes were proposed in this pull request? This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`. For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example: ```sql SELECT count(*) FROM tbl WHERE false; ``` The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891). ### Why are the changes needed? Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan === !Aggregate [id#5L], [id#5L] Project [id#5L] +- Union false, false +- Union false, false :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#41576 from wangyum/SPARK-44040. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 55ba63c) Signed-off-by: Yuming Wang <[email protected]>
…eryStageExec ### What changes were proposed in this pull request? This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`. For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example: ```sql SELECT count(*) FROM tbl WHERE false; ``` The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891). ### Why are the changes needed? Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan === !Aggregate [id#5L], [id#5L] Project [id#5L] +- Union false, false +- Union false, false :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#41576 from wangyum/SPARK-44040. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 55ba63c) Signed-off-by: Yuming Wang <[email protected]>
…eryStageExec ### What changes were proposed in this pull request? This PR fixes compute stats when `BaseAggregateExec` nodes above `QueryStageExec`. For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example: ```sql SELECT count(*) FROM tbl WHERE false; ``` The number of shuffle output rows is 0, and the final result is 1. Please see the [UI](https://github.com/apache/spark/assets/5399861/9d9ad999-b3a9-433e-9caf-c0b931423891). ### Why are the changes needed? Fix data issue. `OptimizeOneRowPlan` will use stats to remove `Aggregate`: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeOneRowPlan === !Aggregate [id#5L], [id#5L] Project [id#5L] +- Union false, false +- Union false, false :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) :- LogicalQueryStage Aggregate [sum(id#0L) AS id#5L], HashAggregate(keys=[], functions=[sum(id#0L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) +- LogicalQueryStage Aggregate [sum(id#18L) AS id#12L], HashAggregate(keys=[], functions=[sum(id#18L)]) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes apache#41576 from wangyum/SPARK-44040. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 55ba63c) Signed-off-by: Yuming Wang <[email protected]>
What changes were proposed in this pull request?
This PR fixes compute stats when
BaseAggregateExecnodes aboveQueryStageExec.For aggregation, when the number of shuffle output rows is 0, the final result may be 1. For example:
The number of shuffle output rows is 0, and the final result is 1. Please see the UI.
Why are the changes needed?
Fix data issue.
OptimizeOneRowPlanwill use stats to removeAggregate:Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test.