Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,8 @@ private[spark] trait TaskScheduler {
*/
def applicationAttemptId(): Option[String]

/**
* Mark executor is pending to remove.
*/
def markExecutorPendingToRemove(executorId: String): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ private[spark] class TaskSchedulerImpl(
// in turn is used to decide when we can attain data locality on a given host
protected val hostToExecutors = new HashMap[String, HashSet[String]]

protected val availableHostToExecutors = new HashMap[String, HashSet[String]]

protected val hostsByRack = new HashMap[String, HashSet[String]]

protected val executorIdToHost = new HashMap[String, String]
Expand Down Expand Up @@ -495,8 +497,12 @@ private[spark] class TaskSchedulerImpl(
if (!hostToExecutors.contains(o.host)) {
hostToExecutors(o.host) = new HashSet[String]()
}
if (!availableHostToExecutors.contains(o.host)) {
availableHostToExecutors(o.host) = new HashSet[String]()
}
if (!executorIdToRunningTaskIds.contains(o.executorId)) {
hostToExecutors(o.host) += o.executorId
availableHostToExecutors(o.host) += o.executorId
executorAdded(o.executorId, o.host)
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
Expand Down Expand Up @@ -589,7 +595,7 @@ private[spark] class TaskSchedulerImpl(
}

if (!launchedAnyTask) {
taskSet.getCompletelyExcludedTaskIfAny(hostToExecutors).foreach { taskIndex =>
taskSet.getCompletelyExcludedTaskIfAny(availableHostToExecutors).foreach { taskIndex =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

availableHostToExecutors or hostToAvailableExecutors

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be simpler to just have a hash set updated for killed executor and filter out from executorIdToHost

// If the taskSet is unschedulable we try to find an existing idle excluded
// executor and kill the idle executor and kick off an abortTimer which if it doesn't
// schedule a task within the timeout will abort the taskSet if we were unable to
Expand All @@ -605,7 +611,8 @@ private[spark] class TaskSchedulerImpl(
// If there are no idle executors and dynamic allocation is enabled, then we would
// notify ExecutorAllocationManager to allocate more executors to schedule the
// unschedulable tasks else we will abort immediately.
executorIdToRunningTaskIds.find(x => !isExecutorBusy(x._1)) match {
executorIdToRunningTaskIds.find(
x => isExecutorAvailable(x._1) && !isExecutorBusy(x._1)) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just extend busy to include those we're planning on removing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF we can directly use CoarseGrainedSchedulerBackend's executorPendingToRemove should be more better, but seems can't?

case Some ((executorId, _)) =>
if (!unschedulableTaskSetToExpiryTime.contains(taskSet)) {
healthTrackerOpt.foreach(blt => blt.killExcludedIdleExecutor(executorId))
Expand Down Expand Up @@ -967,6 +974,7 @@ private[spark] class TaskSchedulerImpl(
if (executorIdToHost.contains(executorId)) {
executorsPendingDecommission(executorId) =
ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost)
removeAvailableExecutor(executorId)
}
}
rootPool.executorDecommission(executorId)
Expand Down Expand Up @@ -1084,6 +1092,7 @@ private[spark] class TaskSchedulerImpl(
}
}

removeAvailableExecutor(executorId)
executorsPendingDecommission.remove(executorId)
.foreach(executorsRemovedByDecom.put(executorId, _))

Expand Down Expand Up @@ -1120,6 +1129,11 @@ private[spark] class TaskSchedulerImpl(
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}

def isExecutorAvailable(execId: String): Boolean = synchronized {
executorIdToHost.get(execId)
.exists(availableHostToExecutors.get(_).exists(_.contains(execId)))
}

// exposed for test
protected final def isExecutorDecommissioned(execId: String): Boolean =
getExecutorDecommissionState(execId).isDefined
Expand Down Expand Up @@ -1190,6 +1204,20 @@ private[spark] class TaskSchedulerImpl(
manager
}
}

override def markExecutorPendingToRemove(executorId: String): Unit = synchronized {
removeAvailableExecutor(executorId)
}

private def removeAvailableExecutor(executorId: String): Unit = {
executorIdToHost.get(executorId).foreach { host =>
val execs = availableHostToExecutors.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
availableHostToExecutors -= host
}
}
}
}


Expand Down Expand Up @@ -1311,4 +1339,5 @@ private[spark] object TaskSchedulerImpl {
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }
executorsToKill.foreach { id => {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about decommissioned executors ?

scheduler.markExecutorPendingToRemove(id)
executorsPendingToRemove(id) = !countFailures
} }

logInfo(log"Actual list of executor(s) to be killed is " +
log"${MDC(LogKeys.EXECUTOR_IDS, executorsToKill.mkString(", "))}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
executorId: String): Option[ExecutorDecommissionState] = {
executorsPendingDecommission.get(executorId)
}
override def markExecutorPendingToRemove(executorId: String): Unit = {}
}

/**
Expand Down Expand Up @@ -965,6 +966,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
decommissionInfo: ExecutorDecommissionInfo): Unit = {}
override def getExecutorDecommissionState(
executorId: String): Option[ExecutorDecommissionState] = None
override def markExecutorPendingToRemove(executorId: String): Unit = {}
}
val noKillScheduler = new DAGScheduler(
sc,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ private class DummyTaskScheduler extends TaskScheduler {
decommissionInfo: ExecutorDecommissionInfo): Unit = {}
override def getExecutorDecommissionState(
executorId: String): Option[ExecutorDecommissionState] = None
override def markExecutorPendingToRemove(executorId: String): Unit = {}
}