Skip to content

Commit d7c250d

Browse files
committed
Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
1 parent 305abe1 commit d7c250d

File tree

2 files changed

+29
-9
lines changed

2 files changed

+29
-9
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,20 @@ private[spark] case class Heartbeat(
3737
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3838
blockManagerId: BlockManagerId)
3939

40+
private[spark] case class RegisterTaskScheduler(scheduler: TaskScheduler)
41+
4042
private[spark] case object ExpireDeadHosts
4143

4244
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
4345

4446
/**
4547
* Lives in the driver to receive heartbeats from executors..
4648
*/
47-
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
49+
private[spark] class HeartbeatReceiver(sc: SparkContext)
4850
extends Actor with ActorLogReceive with Logging {
4951

52+
private var scheduler: TaskScheduler = null
53+
5054
// executor ID -> timestamp of when the last heartbeat from this executor was received
5155
private val executorLastSeen = new mutable.HashMap[String, Long]
5256

@@ -71,12 +75,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
7175
}
7276

7377
override def receiveWithLogging: PartialFunction[Any, Unit] = {
74-
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
75-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
76-
executorId, taskMetrics, blockManagerId)
77-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
78-
executorLastSeen(executorId) = System.currentTimeMillis()
79-
sender ! response
78+
case RegisterTaskScheduler(scheduler) =>
79+
this.scheduler = scheduler
80+
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
81+
if (scheduler == null) {
82+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
83+
executorId, taskMetrics, blockManagerId)
84+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
85+
executorLastSeen(executorId) = System.currentTimeMillis()
86+
sender ! response
87+
} else {
88+
// Because Executor will sleep several seconds then send the first "Heartbeat", this case
89+
// rarely happens. However, if it really happens, log it and ask the executor to register
90+
// itself again.
91+
logWarning(s"Dropping $heartbeat because TaskScheduler has not been ready yet")
92+
sender ! HeartbeatResponse(reregisterBlockManager = true)
93+
}
8094
case ExpireDeadHosts =>
8195
expireDeadHosts()
8296
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
356356
val sparkUser = Utils.getCurrentUserName()
357357
executorEnvs("SPARK_USER") = sparkUser
358358

359+
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
360+
// retrieve "HeartbeatReceiver" in the constructor.
361+
private val heartbeatReceiver = env.actorSystem.actorOf(
362+
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
363+
359364
// Create and start the scheduler
360365
private[spark] var (schedulerBackend, taskScheduler) =
361366
SparkContext.createTaskScheduler(this, master)
362-
private val heartbeatReceiver = env.actorSystem.actorOf(
363-
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
367+
368+
heartbeatReceiver ! RegisterTaskScheduler(taskScheduler)
369+
364370
@volatile private[spark] var dagScheduler: DAGScheduler = _
365371
try {
366372
dagScheduler = new DAGScheduler(this)

0 commit comments

Comments
 (0)