-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33933][SQL] Materialize BroadcastQueryStage first to avoid broadcast timeout in AQE #30998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @maryannxue and @cloud-fan FYI |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
Show resolved
Hide resolved
|
Hi @cloud-fan @viirya any other concerns? Can you approve the test for this PR (The old PR |
sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
Outdated
Show resolved
Hide resolved
|
ok to test |
viirya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks okay.
|
Kubernetes integration test starting |
|
Test build #133726 has finished for PR 30998 at commit
|
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #133729 has finished for PR 30998 at commit
|
|
retest this please |
|
let's see if the new test is flaky or not. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133748 has finished for PR 30998 at commit
|
|
thanks, merging to master/3.1! |
…adcast timeout in AQE ### What changes were proposed in this pull request? In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others. It can make sure the broadcast job are submitted before map jobs to avoid waiting for job schedule and cause broadcast timeout. ### Why are the changes needed? When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? 1. Add UT 2. Test the code using dev environment in https://issues.apache.org/jira/browse/SPARK-33933 Closes #30998 from zhongyu09/aqe-broadcast. Authored-by: Yu Zhong <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit d36cdd5) Signed-off-by: Wenchen Fan <[email protected]>
|
Thanks @cloud-fan, why not merge to branch 3.0? |
|
@zhongyu09 can you open a backport PR for 3.0? There are many AQE code changes in this release. |
Sure, just like the old one #30962 right ? I am puzzled why not directly merge to branch 3.0? |
|
3.0 code base is very different from 3.1, and I'm afraid the test may fail. It's safer to make a PR to make sure all tests pass. |
|
| } | ||
| } | ||
|
|
||
| test("SPARK-33933: AQE broadcast should not timeout with slow map tasks") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this case fails a little frequently:
- https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-3.1-test-maven-hadoop-2.7-jdk-11/95/
- https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-3.2/1854/
- https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.7-jdk-11-scala-2.13/147/#showFailuresLink
- https://amplab.cs.berkeley.edu/jenkins/job/spark-branch-3.1-test-maven-hadoop-3.2/80/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so it is still flaky.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I try the test tens of times and the test failed twice. As we discussed, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. So there's still risk for the shuffle map job schedule earlier before broadcast job. I wonder should we need to remove the UT until we thorough resolve the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I run SPARK-33933: AQE broadcast should not timeout with slow map tasks in local 5 times and all failed as follow:
- SPARK-33933: AQE broadcast should not timeout with slow map tasks *** FAILED ***
1751 was not greater than 2000 (AdaptiveQueryExecSuite.scala:1454)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so it is still flaky.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the spark conf changed to local[2] and so the running times are faster than before.
This shows the test is unreliable...
Checking the Spark jobs submission order should be easy to do and fast to run, and with retry it should be unlikely to fail. It's better to check stage submission order directly, if we can figure out how to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I know, the failure reported by @LuciferYang is easy to solve.
But the question is, the jobs submission order may be not correct, like the test in #31084.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan You mean with retry it should be unlikely to fail to solve the edge case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to create a new PR. Sorry for the inconvenience.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get the stage submission time using SparkListeneer. But after trying serval times, the stage submission time is not stable thus the UT cannot always passed. I suggest to remove the UT before we completely solve the issue. #31099
|
Guys, let me revert this one. This causes test failure too often, and it blocks RC preparation. The flakiness is more obvious when you check the jobs here: https://amplab.cs.berkeley.edu/jenkins/, and this blocks for me to check the test results from PySpark or SparkR at least for the RC. |
|
+1 for reverting it. |
|
+1 |
|
Create another PR #31167 |
|
Thanks @zhongyu09 |
…oid broadcast timeout in AQE ### What changes were proposed in this pull request? This PR is the same as #30998, but with a better UT. In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others. This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved. ### Why are the changes needed? When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small. The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. 1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job 2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage 1 is trigger before 2, so in normal cases, the broadcast job will be submit first. However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job. Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Add UT Closes #31269 from zhongyu09/aqe-broadcast-partial-fix. Authored-by: Yu Zhong <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…oid broadcast timeout in AQE ### What changes were proposed in this pull request? This PR is the same as apache#30998, but with a better UT. In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others. This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved. ### Why are the changes needed? When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small. The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. 1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job 2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage 1 is trigger before 2, so in normal cases, the broadcast job will be submit first. However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job. Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Add UT Closes apache#31269 from zhongyu09/aqe-broadcast-partial-fix. Authored-by: Yu Zhong <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…oid broadcast timeout in AQE ### What changes were proposed in this pull request? This PR is the same as apache#30998, but with a better UT. In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others. This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved. ### Why are the changes needed? When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475). The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small. The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread. 1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job 2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage 1 is trigger before 2, so in normal cases, the broadcast job will be submit first. However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job. Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Add UT Closes apache#31269 from zhongyu09/aqe-broadcast-partial-fix. Authored-by: Yu Zhong <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others.
It can make sure the broadcast job are submitted before map jobs to avoid waiting for job schedule and cause broadcast timeout.
Why are the changes needed?
When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map job and broadcast job are submitted almost at the same time, but map job will hold all the computing resources. If the map job runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).
The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small.
Does this PR introduce any user-facing change?
NO
How was this patch tested?