Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case class RegisterTaskScheduler(scheduler: TaskScheduler)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext)
extends Actor with ActorLogReceive with Logging {

private var scheduler: TaskScheduler = null

// executor ID -> timestamp of when the last heartbeat from this executor was received
private val executorLastSeen = new mutable.HashMap[String, Long]

Expand All @@ -71,12 +75,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
}

override def receiveWithLogging: PartialFunction[Any, Unit] = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
case RegisterTaskScheduler(scheduler) =>
this.scheduler = scheduler
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
if (scheduler != null) {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
executorLastSeen(executorId) = System.currentTimeMillis()
sender ! response
} else {
// Because Executor will sleep several seconds then send the first "Heartbeat", this case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep several seconds before sending

// rarely happens. However, if it really happens, log it and ask the executor to register
// itself again.
logWarning(s"Dropping $heartbeat because TaskScheduler has not been ready yet")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not ready yet

sender ! HeartbeatResponse(reregisterBlockManager = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the need for reregistering vs. just ignoring the heartbeat?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reregistering: Because Executor uses askWithReply to send Heartbeat, I want to send something back to Executor to avoid timeout.

}
case ExpireDeadHosts =>
expireDeadHosts()
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val sparkUser = Utils.getCurrentUserName()
executorEnvs("SPARK_USER") = sparkUser

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, mind adding (SPARK-6640) at the end?

private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")

// Create and start the scheduler
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")

heartbeatReceiver ! RegisterTaskScheduler(taskScheduler)

@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down