Skip to content

Conversation

@zhongyu09
Copy link
Contributor

@zhongyu09 zhongyu09 commented Jan 13, 2021

What changes were proposed in this pull request?

In AdaptiveSparkPlanExec.getFinalPhysicalPlan, when newStages are generated, materialize BroadcastQueryStage first and wait the materialization finish before materialize other (ShuffleQueryStage) stages.
It can make sure the broadcast job are scheduled and finished before map jobs to avoid waiting for job schedule and cause broadcast timeout. This is the same behavior with non-AQE queries.
Actually, we want to only control the schedule for broadcast job is before map jobs. However, it is difficult to control and may have large changes to spark-core. So the trade off is wait broadcast job finish before materialize ShuffleQueryStage.

Consider the case, a is a large table, b and c are very small in-memory dimension tables.

SELECT a.id, a.name, b.name, c.name, count(a.value) 
FROM a 
JOIN b on a.id = b.id 
JOIN c on a.name = c.name 
GROUP BY a.id, a.name

For non-AQE:

  1. run collect b, then broadcast b
  2. run collect c, then broadcast c
  3. submit job which contains 2 stage

For current AQE:

  1. submit 3 job( shuffle map stage for a, collect b and broadcast b, collect c broadcast c ) almost at the same time
  2. when all finished, run result stage

For AQE with this PR:

  1. submit 2 job(collect b and broadcast b, collect c broadcast c) at the same time
  2. wait broadcast of b and c finish
  3. run shuffle map stage
  4. run result stage

Why are the changes needed?

In non-AQE, we always wait the broadcast finish before submit shuffle map tasks.

When enable AQE, in getFinalPhysicalPlan, spark traversal the physical plan bottom up and create query stage for materialized part by createQueryStages and materialize those new created query stages to submit map stages or broadcasting. When ShuffleQueryStage are materializing before BroadcastQueryStage, the map stage(job) and broadcast job are submitted almost at the same time, but map stage will hold all the computing resources. If the map stage runs slow (when lots of data needs to process and the resource is limited), the broadcast job cannot be started(and finished) before spark.sql.broadcastTimeout, thus cause whole job failed (introduced in SPARK-31475).

The workaround to increase spark.sql.broadcastTimeout doesn't make sense and graceful, because the data to broadcast is very small.

#30998 give a solution by sort the new stages by class type to make sure the calling of materialize() for BroadcastQueryState precede others. However, the solution is not perfect and because of the flaky of UT, it is revered. The order of calling materialize can guarantee that the order of task to be scheduled in normal circumstances, but, the guarantee is not strict since the submit of broadcast job and shuffle map job are in different thread.

  1. for broadcast job, call doPrepare() in main thread, and then start the real materialization in "broadcast-exchange-0" thread pool: calling getByteArrayRdd().collect() to submit collect job
  2. for shuffle map job, call ShuffleExchangeExec.mapOutputStatisticsFuture() which call sparkContext.submitMapStage() directly in main thread to submit map stage

1 is trigger before 2, so in normal cases, the broadcast job will be submit first.
However, we can not control how fast the two thread runs, so the "broadcast-exchange-0" thread could run a little bit slower than main thread, result in map stage submit first So there's still risk for the shuffle map job schedule earlier before broadcast job.

Does this PR introduce any user-facing change?

NO

How was this patch tested?

  1. Add UT
  2. Test the code using dev environment in https://issues.apache.org/jira/browse/SPARK-33933

…ization finish before materialize other stages
@cloud-fan
Copy link
Contributor

With this PR, I think there will be an AQE perf regression if the cluster has sufficient resources.

@zhongyu09
Copy link
Contributor Author

Hi @HyukjinKwon @dongjoon-hyun @cloud-fan @viirya @LuciferYang @maryannxue, please help review. Let's have sufficient test this time.

val broadcastMaterializationFutures = result.newStages
.filter(_.isInstanceOf[BroadcastQueryStageExec])
.map { stage =>
var future: Future[Any] = null
Copy link
Contributor

@LuciferYang LuciferYang Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent: line 201 ~216

Copy link
Contributor Author

@zhongyu09 zhongyu09 Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure line 201 ~ 215 should have 2 more space indent. Just behavior same as line 225~ 236 (old code).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be :)

}

// Wait for the materialization of all broadcast stages finish
broadcastMaterializationFutures.foreach(ThreadUtils.awaitReady(_, Duration.Inf))
Copy link
Contributor

@LuciferYang LuciferYang Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to wait until all BroadcastQueryStageExec are materialized, this may cause waste of resources as @cloud-fan said

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In deed, there will be a little waste of resources. This is the same behavior as non-AQE. Given the lightweight of broadcast, it should not cause too much time, few seconds in normal. I think that's acceptable.
If not wait, there's still probability that situations in #30998 will occur and cause broadcast timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhongyu09 It might be better to give a benchmark to compare the performance difference between before and after

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, do we have some benchmark testing framework?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micro benchmark can base on BenchmarkBase or SqlBasedBenchmark, like DataSourceReadBenchmark. But for this scenario, I prefer to you can give a description of the test process and a comparison of the benchmark numbers, maybe need some screenshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the partial fix like #30998. I wonder is it too heavy to add new event just for UT?
I tend to fix the problem without perf regression. But we can also let the partial fix goes first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also log the stage submission and then write test to verify the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an idea. I will have a look for how to do this. Do we have any UT to verify the log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea a lot, e.g. AdaptiveQueryExecSuite.test log level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a partial fix as discussed in #31269 cc @viirya

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jan 14, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38617/

@SparkQA
Copy link

SparkQA commented Jan 14, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38617/

@SparkQA
Copy link

SparkQA commented Jan 14, 2021

Test build #134030 has finished for PR 31167 at commit 6bc38f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

This PR seems to be superseded by the author at #31269

[SPARK-33933][SQL] Materialize BroadcastQueryStage first to try to avoid broadcast timeout in AQE

I'll close this. Please feel free to reopen this if I'm wrong.

@zhongyu09
Copy link
Contributor Author

This PR seems to be superseded by the author at #31269

[SPARK-33933][SQL] Materialize BroadcastQueryStage first to try to avoid broadcast timeout in AQE

I'll close this. Please feel free to reopen this if I'm wrong.

Thanks, it is superseded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants