diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5018eb38d91c0..3c112429c2a1b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -218,6 +218,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _blacklistTracker: Option[BlacklistTracker] = _ /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -327,6 +328,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] def cleaner: Option[ContextCleaner] = _cleaner + private[spark] def blacklistTracker: Option[BlacklistTracker] = _blacklistTracker + private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack @@ -534,6 +537,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } _executorAllocationManager.foreach(_.start()) + // By default blacklistTracker is enabled. + _blacklistTracker = if (_conf.getBoolean("spark.scheduler.blacklist.enabled", true)) { + Some(new BlacklistTracker(_conf)) + } else { + None + } + _blacklistTracker.foreach(_.start()) + _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) @@ -1766,6 +1777,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } + Utils.tryLogNonFatalError { + _blacklistTracker.foreach(_.stop()) + } + if (_listenerBusStarted) { Utils.tryLogNonFatalError { listenerBus.stop() diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala new file mode 100644 index 0000000000000..b16fb642e65f0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.util.Clock + +/** + * The interface to determine executor blacklist and node blacklist. + * + * TODO notes on thread-safety + */ +private[scheduler] trait BlacklistStrategy { + /** Define a time interval to expire failure information of executors */ + val expireTimeInMilliseconds: Long + + /** Return executors in blacklist which are related to given stage and partition */ + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: StageAndPartition, + clock: Clock): Set[String] + + /** Return all nodes in blacklist */ + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Set[String] + + /** + * Return all nodes in blacklist for specified stage. By default it returns the same result as + * getNodeBlacklist. It could be override in strategy implementation. + */ + def getNodeBlacklistForStage( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + stageId: Int, + clock: Clock): Set[String] = getNodeBlacklist(executorIdToFailureStatus, clock) + + /** + * Choose which executors should be removed from blacklist. Return true if any executors are + * removed from the blacklist, false otherwise. The default implementation removes executors from + * the blacklist after [[expireTimeInMilliseconds]] + */ + def expireExecutorsInBlackList( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Boolean = { + + val now = clock.getTimeMillis() + val expiredKey = executorIdToFailureStatus.filter { + case (executorid, failureStatus) => + (now - failureStatus.updatedTime) >= expireTimeInMilliseconds + }.keySet + + if (expiredKey.isEmpty) { + false + } else { + executorIdToFailureStatus --= expiredKey + true + } + } +} + +/** + * This strategy is applied to keep the same semantics as standard behavior before spark 1.6. + * + * If an executor failed running "task A", then we think this executor is blacked for "task A", + * but at the same time. it is still healthy for other task. Node blacklist is always empty. + */ +private[scheduler] class SingleTaskStrategy( + val expireTimeInMilliseconds: Long) extends BlacklistStrategy { + var executorBlacklistCallCount = 0L + def getExecutorBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + atomTask: StageAndPartition, + clock: Clock): Set[String] = { + executorBlacklistCallCount += 1 + executorIdToFailureStatus.filter{ + case (_, failureStatus) => failureStatus.numFailuresPerTask.keySet.contains(atomTask) && + clock.getTimeMillis() - failureStatus.updatedTime < expireTimeInMilliseconds + }.keys.toSet + } + + def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Set[String] = Set.empty[String] +} + +/** + * Comparing to SingleTaskStrategy, it supports node blacklist. With this strategy, once more than + * one executor failed running for specific stage, we put all executors on the same node into + * blacklist. So all tasks from the same stage will not be allocated to that node. + */ +private[scheduler] class AdvancedSingleTaskStrategy( + expireTimeInMilliseconds: Long) extends SingleTaskStrategy(expireTimeInMilliseconds) { + + var nodeBlacklistCallCount = 0L + override def getNodeBlacklistForStage( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + stageId: Int, + clock: Clock): Set[String] = { + nodeBlacklistCallCount += 1 + // when there is one bad node (or executor), this is really slow. We pile up a ton of + // task failures, and we've got to iterate through failure data for each task. Furthermore, + // since we don't actively blacklist the bad node / executor, we just keep assigning it more + // tasks that fail. And after each failure, we invalidate our cache, which means we need + // to call this again. + // This can be particularly painful when the failures are fast, since its likely the only + // executor with free slots is the one which just failed some tasks, which just keep going ... + val nodes = executorIdToFailureStatus.filter{ + case (_, failureStatus) => + failureStatus.numFailuresPerTask.keySet.map(_.stageId).contains(stageId) && + clock.getTimeMillis() - failureStatus.updatedTime < expireTimeInMilliseconds + }.values.map(_.host) + getDuplicateElem(nodes, 1) + } + + override def getNodeBlacklist( + executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], + clock: Clock): Set[String] = { + // resolve a nodes sequence from failure status. + val nodes = executorIdToFailureStatus.values.map(_.host) + getDuplicateElem(nodes, 1) + } + + // A help function to find hosts which have more than "depTimes" executors on it in blacklist + private def getDuplicateElem(ndoes: Iterable[String], dupTimes: Int): Set[String] = { + ndoes.groupBy(identity).mapValues(_.size) // resolve map (nodeName => occurred times) + .filter(ele => ele._2 > dupTimes) // return nodes which occurred more than dupTimes. + .keys.toSet + } +} + +/** + * Create BlacklistStrategy instance according to SparkConf + */ +private[scheduler] object BlacklistStrategy { + def apply(sparkConf: SparkConf): BlacklistStrategy = { + val timeout = sparkConf.getTimeAsMs("spark.scheduler.blacklist.timeout", + sparkConf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L).toString() + "ms") + + sparkConf.getBoolean("spark.scheduler.blacklist.advancedStrategy", false) match { + case false => new SingleTaskStrategy(timeout) + case true => new AdvancedSingleTaskStrategy(timeout) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala new file mode 100644 index 0000000000000..4ca0713880a64 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock +import org.apache.spark.util.SystemClock +import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.Utils + +/** + * BlacklistTracker is designed to track problematic executors and nodes. It belongs to + * SparkContext as an centralized and unified collection for all tasks with same SparkContext. + * So that a new TaskSet could be benefit from previous experiences of other TaskSets. + * + * Once task finished, the callback method in TaskSetManager should update + * executorIdToFailureStatus Map. + */ +private[spark] class BlacklistTracker( + sparkConf: SparkConf, + clock: Clock = new SystemClock()) extends BlacklistCache with Logging { + + // maintain a ExecutorId --> FailureStatus HashMap + private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() + // Apply Strategy pattern here to change different blacklist detection logic + private val strategy = BlacklistStrategy(sparkConf) + + + // A daemon thread to expire blacklist executor periodically + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor( + "spark-scheduler-blacklist-expire-timer") + + private val recoverPeriod = sparkConf.getTimeAsSeconds( + "spark.scheduler.blacklist.recoverPeriod", "60s") + + def start(): Unit = { + val scheduleTask = new Runnable() { + override def run(): Unit = { + Utils.logUncaughtExceptions(expireExecutorsInBlackList()) + } + } + scheduler.scheduleAtFixedRate(scheduleTask, 0L, recoverPeriod, TimeUnit.SECONDS) + } + + def stop(): Unit = { + scheduler.shutdown() + scheduler.awaitTermination(10, TimeUnit.SECONDS) + logInfo(s"Executor Blacklist callcount =" + + s" ${strategy.asInstanceOf[SingleTaskStrategy].executorBlacklistCallCount}") + strategy match { + case as: AdvancedSingleTaskStrategy => + logInfo(s"Node Blacklist callcount =" + + s" ${as.nodeBlacklistCallCount}") + case _ => // no op + } + } + + // The actual implementation is delegated to strategy + private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { + val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) + logInfo(s"Checked for expired blacklist: ${updated}") + if (updated) { + invalidateCache() + } + } + + // The actual implementation is delegated to strategy + def executorBlacklist( + sched: TaskSchedulerImpl, + stageId: Int, + partition: Int): Set[String] = synchronized { + // note that this is NOT only called from the dag scheduler event loop + val atomTask = StageAndPartition(stageId, partition) + if (!isBlacklistExecutorCacheValid) { + reEvaluateExecutorBlacklistAndUpdateCache(sched, atomTask, clock) + } else { +// getExecutorBlacklistFromCache(atomTask).getOrElse(Set.empty[String]) + getExecutorBlacklistFromCache(atomTask).getOrElse { + // TODO Why is this necessary? (its because we clear the entire map on an invalidate, + // and lazily rebuild it) + reEvaluateExecutorBlacklistAndUpdateCache(sched, atomTask, clock) + } + } + } + + // The actual implementation is delegated to strategy + def nodeBlacklist(): Set[String] = synchronized { + if (isBlacklistNodeCacheValid) { + getNodeBlacklistFromCache + } else { + val nodes = strategy.getNodeBlacklist(executorIdToFailureStatus, clock) + updateBlacklistNodeCache(nodes) + nodes + } + } + + // The actual implementation is delegated to strategy + def nodeBlacklistForStage(stageId: Int): Set[String] = synchronized { + // TODO here and elsewhere -- we invalidate the cache way too often. In general, we should + // be able to do an in-place update of the caches. (a) this is slow and (b) it makes + // it really hard to track when the blacklist actually changes (would be *really* nice to + // log a msg about node level blacklisting at least) + if (isBlacklistNodeForStageCacheValid) { + getNodeBlacklistForStageFromCache(stageId).getOrElse( + reEvaluateNodeBlacklistForStageAndUpdateCache(stageId)) + } else { + reEvaluateNodeBlacklistForStageAndUpdateCache(stageId) + } + } + + def taskSucceeded( + stageId: Int, + partition: Int, + info: TaskInfo): Unit = synchronized { + // when an executor successfully completes any task, we remove it from the blacklist + // for *all* tasks + removeFailedExecutorsForTaskId(info.executorId, stageId, partition) + } + + def taskFailed( + stageId: Int, + partition: Int, + info: TaskInfo): Unit = synchronized { + // If the task failed, update latest failure time and failedTaskIds + val atomTask = StageAndPartition(stageId, partition) + val executorId = info.executorId + executorIdToFailureStatus.get(executorId) match { + case Some(failureStatus) => + failureStatus.updatedTime = clock.getTimeMillis() + val failedTimes = failureStatus.numFailuresPerTask.getOrElse(atomTask, 0) + 1 + failureStatus.numFailuresPerTask(atomTask) = failedTimes + case None => + val failedTasks = mutable.HashMap(atomTask -> 1) + val failureStatus = new FailureStatus( + clock.getTimeMillis(), info.host, failedTasks) + executorIdToFailureStatus(executorId) = failureStatus + } + invalidateCache() + } + + /** remove the executorId from executorIdToFailureStatus */ + def removeFailedExecutors(executorId: String) : Unit = synchronized { + executorIdToFailureStatus.remove(executorId) + invalidateCache() + } + + /** + * remove the failure record related to given taskId from executorIdToFailureStatus. If the + * number of records of given executorId becomes 0, remove the completed executorId. + */ + def removeFailedExecutorsForTaskId( + executorId: String, + stageId: Int, + partition: Int) : Unit = synchronized { + val atomTask = StageAndPartition(stageId, partition) + executorIdToFailureStatus.get(executorId).map{ fs => + fs.numFailuresPerTask.remove(atomTask) + if (fs.numFailuresPerTask.isEmpty) { + executorIdToFailureStatus.remove(executorId) + } + invalidateCache() + } + } + + def isExecutorBlacklisted( + executorId: String, + sched: TaskSchedulerImpl, + stageId: Int, + partition: Int) : Boolean = { + + executorBlacklist(sched, stageId, partition).contains(executorId) + } + + // If the node is in blacklist, all executors allocated on that node will + // also be put into executor blacklist. + private def executorsOnBlacklistedNode( + sched: TaskSchedulerImpl, + atomTask: StageAndPartition): Set[String] = { + nodeBlacklistForStage(atomTask.stageId).flatMap(sched.getExecutorsAliveOnHost(_) + .getOrElse(Set.empty[String])) + } + + private def reEvaluateExecutorBlacklistAndUpdateCache( + sched: TaskSchedulerImpl, + atomTask: StageAndPartition, + clock: Clock): Set[String] = { + // TODO some kind of logging when the blacklist is *updated* + val executors = executorsOnBlacklistedNode(sched, atomTask) ++ + strategy.getExecutorBlacklist(executorIdToFailureStatus, atomTask, clock) + updateBlacklistExecutorCache(atomTask, executors) + executors + } + + private def reEvaluateNodeBlacklistForStageAndUpdateCache(stageId: Int): Set[String] = { + val nodes = strategy.getNodeBlacklistForStage(executorIdToFailureStatus, stageId, clock) + updateBlacklistNodeForStageCache(stageId, nodes) +// updateBlacklistNodeCache(nodes) + nodes + } +} + +/** + * Hide cache details in this trait to make code clean and avoid operation mistake + */ +private[scheduler] trait BlacklistCache extends Logging { + // local cache to minimize the the work when query blacklisted executor and node + private val blacklistExecutorCache = mutable.HashMap.empty[StageAndPartition, Set[String]] + private val blacklistNodeCache = mutable.Set.empty[String] + private val blacklistNodeForStageCache = mutable.HashMap.empty[Int, Set[String]] + + // The flag to mark if cache is valid, it will be set to false when executorIdToFailureStatus be + // updated and it will be set to true, when called executorBlacklist and nodeBlacklist. + private var _isBlacklistExecutorCacheValid : Boolean = false + private var _isBlacklistNodeCacheValid: Boolean = false + private var _isBlacklistNodeForStageCacheValid: Boolean = false + + private val cacheLock = new Object() + + protected def isBlacklistExecutorCacheValid : Boolean = _isBlacklistExecutorCacheValid + protected def isBlacklistNodeCacheValid: Boolean = _isBlacklistNodeCacheValid + protected def isBlacklistNodeForStageCacheValid: Boolean = _isBlacklistNodeForStageCacheValid + + protected def updateBlacklistExecutorCache( + atomTask: StageAndPartition, + blacklistExecutor: Set[String]): Unit = cacheLock.synchronized { + if (!_isBlacklistExecutorCacheValid) { + blacklistExecutorCache.clear() + } + blacklistExecutorCache.update(atomTask, blacklistExecutor) + _isBlacklistExecutorCacheValid = true + } + + protected def updateBlacklistNodeCache( + blacklistNode: Set[String]): Unit = cacheLock.synchronized { + if (!_isBlacklistNodeCacheValid) { + blacklistNodeCache.clear() + } + blacklistNodeCache ++= blacklistNode + _isBlacklistNodeCacheValid = true + } + + protected def updateBlacklistNodeForStageCache( + stageId: Int, + blacklistNode: Set[String]): Unit = cacheLock.synchronized { + // TODO this needs to actually get called, and add unit test + val wasBlacklisted = blacklistNodeForStageCache.getOrElse(stageId, Set.empty[String]) + if (wasBlacklisted != blacklistNode) { + logInfo(s"Updating node blacklist for Stage ${stageId} to ${blacklistNode}") + } + if (!_isBlacklistNodeForStageCacheValid) { + blacklistNodeForStageCache.clear() + } + blacklistNodeForStageCache.update(stageId, blacklistNode) + _isBlacklistNodeForStageCacheValid = true + } + + protected def invalidateCache(): Unit = cacheLock.synchronized { + logInfo("invalidating blacklist cache") + _isBlacklistExecutorCacheValid = false + _isBlacklistNodeCacheValid = false + _isBlacklistNodeForStageCacheValid = false + } + + protected def getExecutorBlacklistFromCache( + atomTask: StageAndPartition): Option[Set[String]] = { + blacklistExecutorCache.get(atomTask) + } + + protected def getNodeBlacklistFromCache: Set[String] = blacklistNodeCache.toSet + + protected def getNodeBlacklistForStageFromCache(stageId: Int): Option[Set[String]] = + blacklistNodeForStageCache.get(stageId) +} + +/** + * A class to record details of failure. + * + * @param initialTime the time when failure status be created + * @param host the node name which running executor on + * @param numFailuresPerTask all tasks failed on the executor (key is StageAndPartition, value + * is the number of failures of this task) + */ +private[scheduler] final class FailureStatus( + initialTime: Long, + val host: String, + val numFailuresPerTask: mutable.HashMap[StageAndPartition, Int]) { + + var updatedTime = initialTime + def totalNumFailures : Int = numFailuresPerTask.values.sum +} + +private[scheduler] case class StageAndPartition(val stageId: Int, val partition: Int) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 01e85ca405587..d4cf641cfc407 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -249,10 +249,16 @@ private[spark] class TaskSchedulerImpl( availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false + // TODO unit test, and also add executor-stage filtering as well + // This is an optimization -- the taskSet might contain a very long list of pending tasks. + // Rather than wasting time checking the offer against each task, and then realizing the + // executor is blacklisted, just filter out the bad executor immediately. + val nodeBlacklist = taskSet.blacklistTracker.map{_.nodeBlacklistForStage(taskSet.stageId)} + .getOrElse(Set()) for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { + if (!nodeBlacklist(host) && availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 08d33f688a160..1a716ff901d86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -58,15 +58,6 @@ private[spark] class TaskSetManager( val conf = sched.sc.conf - /* - * Sometimes if an executor is dead or in an otherwise invalid state, the driver - * does not realize right away leading to repeated task failures. If enabled, - * this temporarily prevents a task from re-launching on an executor where - * it just failed. - */ - private val EXECUTOR_TASK_BLACKLIST_TIMEOUT = - conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L) - // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) @@ -83,8 +74,6 @@ private[spark] class TaskSetManager( val copiesRunning = new Array[Int](numTasks) val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) - // key is taskId, value is a Map of executor id to when it failed - private val failedExecutors = new HashMap[Int, HashMap[String, Long]]() val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksSuccessful = 0 @@ -250,7 +239,8 @@ private[spark] class TaskSetManager( while (indexOffset > 0) { indexOffset -= 1 val index = list(indexOffset) - if (!executorIsBlacklisted(execId, index)) { + if (!blacklistTracker.map(_.isExecutorBlacklisted(execId, sched, stageId, index)) + .getOrElse(false)) { // This should almost always be list.trimEnd(1) to remove tail list.remove(indexOffset) if (copiesRunning(index) == 0 && !successful(index)) { @@ -266,19 +256,11 @@ private[spark] class TaskSetManager( taskAttempts(taskIndex).exists(_.host == host) } - /** - * Is this re-execution of a failed task on an executor it already failed in before - * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ? - */ - private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = { - if (failedExecutors.contains(taskId)) { - val failed = failedExecutors.get(taskId).get - - return failed.contains(execId) && - clock.getTimeMillis() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT - } + var blacklistTracker = sched.sc.blacklistTracker - false + /** VisibleForTesting */ + private[scheduler] def setBlacklistTracker (tracker: BlacklistTracker) = { + blacklistTracker = Some(tracker) } /** @@ -293,7 +275,9 @@ private[spark] class TaskSetManager( speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set def canRunOnHost(index: Int): Boolean = - !hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index) + !hasAttemptOnHost(index, host) && + !blacklistTracker.map(_.isExecutorBlacklisted(execId, sched, stageId, index)) + .getOrElse(false) if (!speculatableTasks.isEmpty) { // Check for process-local tasks; note that tasks can be process-local @@ -478,8 +462,8 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + - s" $taskLocality, ${serializedTask.limit} bytes)") + logInfo(s"Starting $taskName (TID $taskId, $host, exec ${info.executorId}, " + + s"partition ${task.partitionId},$taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, @@ -619,8 +603,9 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) if (!successful(index)) { tasksSuccessful += 1 - logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( - info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) + logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s / exec %s (%d/%d)".format( + info.id, taskSet.id, info.taskId, info.duration, info.host, info.executorId, + tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { @@ -630,7 +615,10 @@ private[spark] class TaskSetManager( logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id + " because task " + index + " has already completed successfully") } - failedExecutors.remove(index) + + blacklistTracker.foreach{ + _.taskSucceeded(stageId, tasks(index).partitionId, info) + } maybeFinishTaskSet() } @@ -648,8 +636,8 @@ private[spark] class TaskSetManager( val index = info.index copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty - val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + - reason.asInstanceOf[TaskFailedReason].toErrorString + val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," + + s" exec ${info.executorId}): ${reason.asInstanceOf[TaskFailedReason].toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => logWarning(failureReason) @@ -712,9 +700,12 @@ private[spark] class TaskSetManager( logError("Unknown TaskEndReason: " + e) None } + // always add to failed executors - failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()). - put(info.executorId, clock.getTimeMillis()) + blacklistTracker.foreach { + _.taskFailed(stageId, tasks(index).partitionId, info) + } + sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info) if (successful(index)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 46a829114ec86..79e4cc2fdf719 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -94,7 +94,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RequestExecutors( requestedTotal: Int, localityAwareTasks: Int, - hostToLocalTaskCount: Map[String, Int]) + hostToLocalTaskCount: Map[String, Int], + nodeBlacklist: Set[String]) extends CoarseGrainedClusterMessage // Check if an executor was force-killed but for a reason unrelated to the running tasks. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0fea9c123bcfb..26bcce20ffe58 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -298,6 +298,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp scheduler.sc.env.blockManager.master.removeExecutor(executorId) logInfo(s"Asked to remove non-existent executor $executorId") } + + // Remove disconnected executor from blacklistTracker to keep consistency + scheduler.sc.blacklistTracker.foreach(_.removeFailedExecutors(executorId)) } /** diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 81b94b57219db..787047c9ddab7 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -272,7 +272,7 @@ private class FakeSchedulerBackend( protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { clusterManagerEndpoint.askWithRetry[Boolean]( - RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)) + RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String])) } protected override def doKillExecutors(executorIds: Seq[String]): Boolean = { @@ -291,7 +291,7 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RequestExecutors(requestedTotal, _, _) => + case RequestExecutors(requestedTotal, _, _, _) => targetNumExecutors = requestedTotal context.reply(true) case KillExecutors(executorIds) => diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala new file mode 100644 index 0000000000000..72f854941406f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.ExceptionFailure +import org.apache.spark.SparkConf +import org.apache.spark.SparkFunSuite +import org.apache.spark.Success +import org.apache.spark.TaskEndReason +import org.apache.spark.util.ManualClock + +class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfter with MockitoSugar { + + val FAILURE: TaskEndReason = new ExceptionFailure( + "Fake", + "fake failure", + Array.empty[StackTraceElement], + "fake stack trace", + None) + + val stage1 = 1 + val stage2 = 2 + + val partition1 = 1 + val partition2 = 2 + val partition3 = 3 + + // Variable name can indicate basic information of taskInfo + // hostA: executor 1, 2, 4 + // hostB: executor 3 + // The format is "taskInfo_executorId_hostName" + val taskInfo_1_hostA = new TaskInfo(1L, 1, 1, 0L, "1", "hostA", TaskLocality.ANY, false) + val taskInfo_2_hostA = new TaskInfo(2L, 1, 1, 0L, "2", "hostA", TaskLocality.ANY, false) + val taskInfo_3_hostB = new TaskInfo(3L, 3, 1, 0L, "3", "hostB", TaskLocality.ANY, false) + + val clock = new ManualClock(0) + + test ("expireExecutorsInBlacklist works") { + // expire time is set to 6s + val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "6000") + + val scheduler = mock[TaskSchedulerImpl] + + val tracker = new BlacklistTracker(conf, clock) + // Executor 1 into blacklist at Time 00:00:00 + tracker.taskFailed(stage1, partition1, taskInfo_1_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + + clock.setTime(2000) + tracker.expireExecutorsInBlackList() + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + // Executor 1 failed again at Time 00::00:02 + tracker.taskFailed(stage1, partition1, taskInfo_1_hostA) + + clock.setTime(3000) + // Executor 2 failed at Time 00:00:03 + tracker.taskFailed(stage1, partition1, taskInfo_2_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + + clock.setTime(6000) + tracker.expireExecutorsInBlackList() + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + + clock.setTime(8000) + tracker.expireExecutorsInBlackList() + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + + clock.setTime(10000) + tracker.expireExecutorsInBlackList() + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + } + + test("blacklist feature is off by default") { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + val scheduler = mock[TaskSchedulerImpl] + + val tracker = new BlacklistTracker(conf, clock) + tracker.taskFailed(stage1, partition1, taskInfo_1_hostA) + tracker.taskFailed(stage1, partition1, taskInfo_2_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + + tracker.taskFailed(stage1, partition3, taskInfo_3_hostB) + assert(tracker.executorBlacklist(scheduler, stage1, 3) === Set()) + assert(tracker.nodeBlacklist() === Set()) + } + + test("SingleTask strategy works") { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.executorTaskBlacklistTime", "1000") + val scheduler = mock[TaskSchedulerImpl] + + // Task 1 failed on both executor 1 and executor 2 + val tracker = new BlacklistTracker(conf, clock) + tracker.taskFailed(stage1, partition1, taskInfo_1_hostA) + tracker.taskFailed(stage1, partition1, taskInfo_2_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2")) + assert(tracker.executorBlacklist(scheduler, stage1, 2) === Set()) + + // Task 1 succeeded on executor 1, so we remove executor 1 from blacklist + tracker.taskSucceeded(stage1, partition1, taskInfo_1_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + assert(tracker.nodeBlacklist() === Set()) + + // Task 2 succeed on executor 3, no effect on blacklist for Task 1 + tracker.taskSucceeded(stage1, partition3, taskInfo_3_hostB) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + + tracker.taskFailed(stage1, partition3, taskInfo_3_hostB) + assert(tracker.executorBlacklist(scheduler, stage1, 3) === Set("3")) + assert(tracker.nodeBlacklist() === Set()) + + tracker.taskSucceeded(stage1, partition1, taskInfo_2_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + + // Task 2 on Stage 2 failed on Executor 2 + tracker.taskFailed(stage2, partition2, taskInfo_2_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set()) + assert(tracker.executorBlacklist(scheduler, stage2, 1) === Set()) + assert(tracker.executorBlacklist(scheduler, stage1, 2) === Set()) + assert(tracker.executorBlacklist(scheduler, stage2, 2) === Set("2")) + } + + test("AdvencedSingleTask strategy works") { + val conf = new SparkConf().setAppName("test").setMaster("local") + .set("spark.ui.enabled", "false") + .set("spark.scheduler.blacklist.advancedStrategy", "true") + .set("spark.scheduler.executorTaskBlacklistTime", "1000") + val scheduler = mock[TaskSchedulerImpl] + when(scheduler.getExecutorsAliveOnHost("hostA")).thenReturn(Some(Set("1", "2", "4"))) + + // Task 1 failed on both executor 1 + val tracker = new BlacklistTracker(conf, clock) + tracker.taskFailed(stage1, partition1, taskInfo_1_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1")) + assert(tracker.executorBlacklist(scheduler, stage1, 2) === Set()) + assert(tracker.nodeBlacklist() === Set()) + + // Task 1 failed on both executor 2 + tracker.taskFailed(stage1, partition1, taskInfo_2_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("1", "2", "4")) + assert(tracker.executorBlacklist(scheduler, stage1, 2) === Set("1", "2", "4")) + assert(tracker.executorBlacklist(scheduler, stage2, 1) === Set()) + assert(tracker.nodeBlacklistForStage(stage1) === Set("hostA")) + assert(tracker.nodeBlacklistForStage(stage2) === Set()) + assert(tracker.nodeBlacklist() === Set("hostA")) + + // Task 1 succeeded on executor 1, so we remove executor 1 from blacklist + tracker.taskSucceeded(stage1, partition1, taskInfo_1_hostA) + assert(tracker.executorBlacklist(scheduler, stage1, 1) === Set("2")) + assert(tracker.nodeBlacklistForStage(stage1) === Set()) + assert(tracker.nodeBlacklist() === Set()) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index 02aa5caa731ff..718963da7dc17 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -131,8 +131,13 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa // When a job fails, we terminate before waiting for all the task end events to come in, // so there might still be a running task set. So we only check these conditions // when the job succeeds - assert(taskScheduler.runningTaskSets.isEmpty) - assert(!backend.hasTasks) + if (taskScheduler.runningTaskSets.nonEmpty) { + fail(s"taskScheduler still has running taskSets: ${taskScheduler.runningTaskSets}") + } + if (backend.hasTasks) { + fail(s"backend still has tasks. Waiting to run: ${backend.assignedTasksWaitingToRun}; " + + s"running : ${backend.runningTasks}") + } } assert(scheduler.activeJobs.isEmpty) } @@ -264,9 +269,9 @@ private[spark] abstract class MockBackend( } // protected by this - private val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000) + val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000) // protected by this - private val runningTasks = ArrayBuffer[TaskDescription]() + val runningTasks = ArrayBuffer[TaskDescription]() def hasTasks: Boolean = synchronized { assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty @@ -413,15 +418,20 @@ private class MockExternalClusterManager extends ExternalClusterManager { /** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */ class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { /** Set of TaskSets the DAGScheduler has requested executed. */ + // protected by this val runningTaskSets = HashSet[TaskSet]() override def submitTasks(taskSet: TaskSet): Unit = { - runningTaskSets += taskSet + synchronized { + runningTaskSets += taskSet + } super.submitTasks(taskSet) } override def taskSetFinished(manager: TaskSetManager): Unit = { - runningTaskSets -= manager.taskSet + synchronized { + runningTaskSets -= manager.taskSet + } super.taskSetFinished(manager) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9b7b945bf3677..4ae3047f41785 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -407,7 +407,11 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // affinity to exec1 on host1 - which we will fail. val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1"))) val clock = new ManualClock + + // spy taskSetManager to set Manual clock for BlacklistTracker val manager = new TaskSetManager(sched, taskSet, 4, clock) + val tracker = new BlacklistTracker(conf, clock) + manager.setBlacklistTracker(tracker) { val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) @@ -462,6 +466,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // After reschedule delay, scheduling on exec1 should be possible. clock.advance(rescheduleDelay) + tracker.expireExecutorsInBlackList() { val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) diff --git a/docs/configuration.md b/docs/configuration.md index d6471a8cc7873..871d7af2573ba 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1100,6 +1100,34 @@ Apart from these, the following properties are also available, and may be useful The interval length for the scheduler to revive the worker resource offers to run tasks. +
spark.scheduler.blacklist.enabledspark.scheduler.blacklist.timeoutspark.scheduler.blacklist.recoverPeriodspark.scheduler.blacklist.advancedStrategyspark.speculation