Skip to content

Commit 88504b7

Browse files
zsxwingAndrew Or
authored andcommitted
[SPARK-6640][Core] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
This PR moved the code of creating `HeartbeatReceiver` above the code of creating `schedulerBackend` to resolve the race condition. Author: zsxwing <[email protected]> Closes #5306 from zsxwing/SPARK-6640 and squashes the following commits: 840399d [zsxwing] Don't send TaskScheduler through Akka a90616a [zsxwing] Fix docs dd202c7 [zsxwing] Fix typo d7c250d [zsxwing] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
1 parent 2c43ea3 commit 88504b7

File tree

2 files changed

+33
-9
lines changed

2 files changed

+33
-9
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,24 @@ private[spark] case class Heartbeat(
3737
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3838
blockManagerId: BlockManagerId)
3939

40+
/**
41+
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
42+
* created.
43+
*/
44+
private[spark] case object TaskSchedulerIsSet
45+
4046
private[spark] case object ExpireDeadHosts
4147

4248
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
4349

4450
/**
4551
* Lives in the driver to receive heartbeats from executors..
4652
*/
47-
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
53+
private[spark] class HeartbeatReceiver(sc: SparkContext)
4854
extends Actor with ActorLogReceive with Logging {
4955

56+
private var scheduler: TaskScheduler = null
57+
5058
// executor ID -> timestamp of when the last heartbeat from this executor was received
5159
private val executorLastSeen = new mutable.HashMap[String, Long]
5260

@@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
7179
}
7280

7381
override def receiveWithLogging: PartialFunction[Any, Unit] = {
74-
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
75-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
76-
executorId, taskMetrics, blockManagerId)
77-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
78-
executorLastSeen(executorId) = System.currentTimeMillis()
79-
sender ! response
82+
case TaskSchedulerIsSet =>
83+
scheduler = sc.taskScheduler
84+
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
85+
if (scheduler != null) {
86+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
87+
executorId, taskMetrics, blockManagerId)
88+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
89+
executorLastSeen(executorId) = System.currentTimeMillis()
90+
sender ! response
91+
} else {
92+
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
93+
// case rarely happens. However, if it really happens, log it and ask the executor to
94+
// register itself again.
95+
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
96+
sender ! HeartbeatResponse(reregisterBlockManager = true)
97+
}
8098
case ExpireDeadHosts =>
8199
expireDeadHosts()
82100
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
356356
val sparkUser = Utils.getCurrentUserName()
357357
executorEnvs("SPARK_USER") = sparkUser
358358

359+
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
360+
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
361+
private val heartbeatReceiver = env.actorSystem.actorOf(
362+
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
363+
359364
// Create and start the scheduler
360365
private[spark] var (schedulerBackend, taskScheduler) =
361366
SparkContext.createTaskScheduler(this, master)
362-
private val heartbeatReceiver = env.actorSystem.actorOf(
363-
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
367+
368+
heartbeatReceiver ! TaskSchedulerIsSet
369+
364370
@volatile private[spark] var dagScheduler: DAGScheduler = _
365371
try {
366372
dagScheduler = new DAGScheduler(this)

0 commit comments

Comments
 (0)