Skip to content

Conversation

@zhongyu09
Copy link
Contributor

What changes were proposed in this pull request?

This PR is the same as #30998, but with a better UT.
In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others.
This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved.

Why are the changes needed?

When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).
The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small.

The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread.

  1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job
  2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage

1 is trigger before 2, so in normal cases, the broadcast job will be submit first.
However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job.

Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

Add UT

@github-actions github-actions bot added the SQL label Jan 21, 2021
@zhongyu09 zhongyu09 changed the title [SPARK-33933][SQL] Materialize BroadcastQueryStage first to avoid broadcast timeout in AQE [SPARK-33933][SQL] Materialize BroadcastQueryStage first to try to avoid broadcast timeout in AQE Jan 21, 2021
@cloud-fan
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38894/

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38894/

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Test build #134308 has finished for PR 31269 at commit 5699cf4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhongyu09
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38902/

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Test build #134315 has finished for PR 31269 at commit 08edec5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38902/

@zhongyu09
Copy link
Contributor Author

Test build #134315 has finished for PR 31269 at commit 08edec5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

It seems the test failure is irrelevant this time

@zhongyu09
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38922/

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38922/

@SparkQA
Copy link

SparkQA commented Jan 21, 2021

Test build #134335 has finished for PR 31269 at commit d0b8ee3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@zhongyu09
Copy link
Contributor Author

thanks, merging to master!

#31307 merge to branch 3.0
Do we needs extra PR to merge to branch 3.1?

@cloud-fan
Copy link
Contributor

It's more of an improvement, so usually we don't backport.

@HyukjinKwon
Copy link
Member

isn't it a bug though? spark.sql.broadcastTimeout wasn't respected properly and took the shuffle time into account.

@zhongyu09
Copy link
Contributor Author

I think it's a bug (partial) fix rather than an improvement.
spark.sql.broadcastTimeout took the shuffle/collect time into account is fine for me, but shouldn't took the schedule time into account.

@cloud-fan
Copy link
Contributor

It's only a problem when the cluster doesn't have sufficient resource and make the schedule time very long. Usually, it's fine to ignore schedule time.

@zhongyu09
Copy link
Contributor Author

But with the cost saving tendency, the situation of insufficient resource will become more and more popular.

skestle pushed a commit to skestle/spark that referenced this pull request Feb 3, 2021
…oid broadcast timeout in AQE

### What changes were proposed in this pull request?
This PR is the same as apache#30998, but with a better UT.
In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others.
This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved.

### Why are the changes needed?
When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).
The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small.

The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread.
1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job
2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage

1 is trigger before 2, so in normal cases, the broadcast job will be submit first.
However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job.

Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933.

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Add UT

Closes apache#31269 from zhongyu09/aqe-broadcast-partial-fix.

Authored-by: Yu Zhong <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
@zhongyu09
Copy link
Contributor Author

Hi @cloud-fan @HyukjinKwon @viirya, I have some follow up to resolved the issue completely, it is better to reuse the JIRA SPARK-33933 or create a new JIRA?

@cloud-fan
Copy link
Contributor

@zhongyu09 please open a new JIRA, thanks!

domybest11 pushed a commit to domybest11/spark that referenced this pull request Jun 15, 2022
…oid broadcast timeout in AQE

### What changes were proposed in this pull request?
This PR is the same as apache#30998, but with a better UT.
In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, sort the new stages by class type to make sure BroadcastQueryState precede others.
This partial fix only grantee the start of materialization for BroadcastQueryStage is prior to others, but because the submission of collect job for broadcasting is run in another thread, the issue is not completely solved.

### Why are the changes needed?
When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).
The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small.

The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread.
1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job
2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage

1 is trigger before 2, so in normal cases, the broadcast job will be submit first.
However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first. So there's still risk for the shuffle map job schedule earlier before broadcast job.

Since completely fix the issue is complex and might introduce major changes, we need more time to follow up. This partial fix is better than do nothing, it resolved most cases in SPARK-33933.

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Add UT

Closes apache#31269 from zhongyu09/aqe-broadcast-partial-fix.

Authored-by: Yu Zhong <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants