Skip to content
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))?

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -386,14 +389,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure.
* */
protected def reset(): Unit = synchronized {
numPendingExecutors = 0
executorsPendingToRemove.clear()
protected def reset(): Unit = {
val executors = synchronized {
numPendingExecutors = 0
executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys
}

// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
executors.foreach { eid =>
driverEndpoint.send(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When changed to send, we don't do retry anymore. It may become less tolerant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only be invoked in yarn client mode when AM is failed and some lingering executors are existed, such situation may not be happened normally as I know. So I think it should be OK to call send.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is actually just sending the message to the same process. retry is not needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also IIUC driverEndpoint is a in-process endpoint, so it should be safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is synchronizing on the entire method which is not required - we need to restrict synchronized block to the subset which is actually required - the copying of the executors to remove.

The snippet I posted earlier does the same - can you please modify it accordingly ?
If we had coded it that way initially, this bug wouldn't have existed to begin with (with or without send/askWithRetry)

Copy link
Member

@viirya viirya Oct 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we change this to send, don't we need to move the handling of RemoveExecutor from receiveAndReply to receive in DriverEndpoint?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think here we can just call removeExecutor(...).

RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}
}
Expand Down