Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.concurrent.ExecutionContext
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import org.apache.spark.SparkException
Expand Down Expand Up @@ -190,7 +191,36 @@ case class AdaptiveSparkPlanExec(
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

// Start materialization of all new stages and fail fast if any stages failed eagerly
result.newStages.foreach { stage =>

// SPARK-33933: we should materialize broadcast stages first and wait the
// materialization finish before materialize other stages, to avoid waiting
// for broadcast tasks to be scheduled and leading to broadcast timeout.
val broadcastMaterializationFutures = result.newStages
.filter(_.isInstanceOf[BroadcastQueryStageExec])
.map { stage =>
var future: Future[Any] = null
Copy link
Contributor

@LuciferYang LuciferYang Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent: line 201 ~216

Copy link
Contributor Author

@zhongyu09 zhongyu09 Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure line 201 ~ 215 should have 2 more space indent. Just behavior same as line 225~ 236 (old code).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be :)

try {
future = stage.materialize()
future.onComplete { res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
cleanUpAndThrowException(Seq(e), Some(stage.id))
}
future
}

// Wait for the materialization of all broadcast stages finish
broadcastMaterializationFutures.foreach(ThreadUtils.awaitReady(_, Duration.Inf))
Copy link
Contributor

@LuciferYang LuciferYang Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to wait until all BroadcastQueryStageExec are materialized, this may cause waste of resources as @cloud-fan said

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In deed, there will be a little waste of resources. This is the same behavior as non-AQE. Given the lightweight of broadcast, it should not cause too much time, few seconds in normal. I think that's acceptable.
If not wait, there's still probability that situations in #30998 will occur and cause broadcast timeout.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhongyu09 It might be better to give a benchmark to compare the performance difference between before and after

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, do we have some benchmark testing framework?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Micro benchmark can base on BenchmarkBase or SqlBasedBenchmark, like DataSourceReadBenchmark. But for this scenario, I prefer to you can give a description of the test process and a comparison of the benchmark numbers, maybe need some screenshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the partial fix like #30998. I wonder is it too heavy to add new event just for UT?
I tend to fix the problem without perf regression. But we can also let the partial fix goes first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also log the stage submission and then write test to verify the log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an idea. I will have a look for how to do this. Do we have any UT to verify the log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea a lot, e.g. AdaptiveQueryExecSuite.test log level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a partial fix as discussed in #31269 cc @viirya


// Start materialization of non-broadcast stages
result.newStages.filter(!_.isInstanceOf[BroadcastQueryStageExec])
.foreach { stage =>
try {
stage.materialize().onComplete { res =>
if (res.isSuccess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.URI

import org.apache.log4j.Level

import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart, SparkListenerStageSubmitted, StageInfo}
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
Expand Down Expand Up @@ -1460,4 +1460,42 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-33933: AQE broadcast should not timeout with slow map tasks") {

val broadcastTimeoutInSec = 2
val shuffleMapTaskParallsm = 100

val input = spark.sparkContext.parallelize(Range(0, 100), shuffleMapTaskParallsm)
.flatMap(x => {
Thread.sleep(50)
for (i <- Range(0, 100)) yield (x % 26, x % 10)
}).toDF("index", "pv")
val dim = Range(0, 26)
.map(x => (x, ('a' + x).toChar.toString))
.toDF("index", "name")
.coalesce(1)
val testDf = input.groupBy("index")
.agg(sum($"pv").alias("pv"))
.join(dim, Seq("index"))

val stageInfos = scala.collection.mutable.ArrayBuffer[StageInfo]()
val listener = new SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
stageInfos += stageSubmitted.stageInfo
}
}
spark.sparkContext.addSparkListener(listener)

withSQLConf(SQLConf.BROADCAST_TIMEOUT.key -> broadcastTimeoutInSec.toString,
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val result = testDf.collect()
assert(result.length == 26)
val sortedStageInfos = stageInfos.sortBy(_.submissionTime)
assert(sortedStageInfos.size > 2)
assert(sortedStageInfos(0).numTasks == 1)
assert(sortedStageInfos(1).numTasks == shuffleMapTaskParallsm)
}

}
}