diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala index edaeb658d0822..b16fb642e65f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistStrategy.scala @@ -24,6 +24,8 @@ import org.apache.spark.util.Clock /** * The interface to determine executor blacklist and node blacklist. + * + * TODO notes on thread-safety */ private[scheduler] trait BlacklistStrategy { /** Define a time interval to expire failure information of executors */ @@ -81,10 +83,12 @@ private[scheduler] trait BlacklistStrategy { */ private[scheduler] class SingleTaskStrategy( val expireTimeInMilliseconds: Long) extends BlacklistStrategy { + var executorBlacklistCallCount = 0L def getExecutorBlacklist( executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], atomTask: StageAndPartition, clock: Clock): Set[String] = { + executorBlacklistCallCount += 1 executorIdToFailureStatus.filter{ case (_, failureStatus) => failureStatus.numFailuresPerTask.keySet.contains(atomTask) && clock.getTimeMillis() - failureStatus.updatedTime < expireTimeInMilliseconds @@ -104,10 +108,19 @@ private[scheduler] class SingleTaskStrategy( private[scheduler] class AdvancedSingleTaskStrategy( expireTimeInMilliseconds: Long) extends SingleTaskStrategy(expireTimeInMilliseconds) { + var nodeBlacklistCallCount = 0L override def getNodeBlacklistForStage( executorIdToFailureStatus: mutable.HashMap[String, FailureStatus], stageId: Int, clock: Clock): Set[String] = { + nodeBlacklistCallCount += 1 + // when there is one bad node (or executor), this is really slow. We pile up a ton of + // task failures, and we've got to iterate through failure data for each task. Furthermore, + // since we don't actively blacklist the bad node / executor, we just keep assigning it more + // tasks that fail. And after each failure, we invalidate our cache, which means we need + // to call this again. + // This can be particularly painful when the failures are fast, since its likely the only + // executor with free slots is the one which just failed some tasks, which just keep going ... val nodes = executorIdToFailureStatus.filter{ case (_, failureStatus) => failureStatus.numFailuresPerTask.keySet.map(_.stageId).contains(stageId) && diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index f2c73710c000c..4ca0713880a64 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging import org.apache.spark.util.Clock import org.apache.spark.util.SystemClock import org.apache.spark.util.ThreadUtils @@ -37,7 +38,7 @@ import org.apache.spark.util.Utils */ private[spark] class BlacklistTracker( sparkConf: SparkConf, - clock: Clock = new SystemClock()) extends BlacklistCache{ + clock: Clock = new SystemClock()) extends BlacklistCache with Logging { // maintain a ExecutorId --> FailureStatus HashMap private val executorIdToFailureStatus: mutable.HashMap[String, FailureStatus] = mutable.HashMap() @@ -64,11 +65,20 @@ private[spark] class BlacklistTracker( def stop(): Unit = { scheduler.shutdown() scheduler.awaitTermination(10, TimeUnit.SECONDS) + logInfo(s"Executor Blacklist callcount =" + + s" ${strategy.asInstanceOf[SingleTaskStrategy].executorBlacklistCallCount}") + strategy match { + case as: AdvancedSingleTaskStrategy => + logInfo(s"Node Blacklist callcount =" + + s" ${as.nodeBlacklistCallCount}") + case _ => // no op + } } // The actual implementation is delegated to strategy private[scheduler] def expireExecutorsInBlackList(): Unit = synchronized { val updated = strategy.expireExecutorsInBlackList(executorIdToFailureStatus, clock) + logInfo(s"Checked for expired blacklist: ${updated}") if (updated) { invalidateCache() } @@ -76,12 +86,18 @@ private[spark] class BlacklistTracker( // The actual implementation is delegated to strategy def executorBlacklist( - sched: TaskSchedulerImpl, stageId: Int, partition: Int): Set[String] = synchronized { + sched: TaskSchedulerImpl, + stageId: Int, + partition: Int): Set[String] = synchronized { + // note that this is NOT only called from the dag scheduler event loop val atomTask = StageAndPartition(stageId, partition) if (!isBlacklistExecutorCacheValid) { reEvaluateExecutorBlacklistAndUpdateCache(sched, atomTask, clock) } else { +// getExecutorBlacklistFromCache(atomTask).getOrElse(Set.empty[String]) getExecutorBlacklistFromCache(atomTask).getOrElse { + // TODO Why is this necessary? (its because we clear the entire map on an invalidate, + // and lazily rebuild it) reEvaluateExecutorBlacklistAndUpdateCache(sched, atomTask, clock) } } @@ -100,6 +116,10 @@ private[spark] class BlacklistTracker( // The actual implementation is delegated to strategy def nodeBlacklistForStage(stageId: Int): Set[String] = synchronized { + // TODO here and elsewhere -- we invalidate the cache way too often. In general, we should + // be able to do an in-place update of the caches. (a) this is slow and (b) it makes + // it really hard to track when the blacklist actually changes (would be *really* nice to + // log a msg about node level blacklisting at least) if (isBlacklistNodeForStageCacheValid) { getNodeBlacklistForStageFromCache(stageId).getOrElse( reEvaluateNodeBlacklistForStageAndUpdateCache(stageId)) @@ -176,14 +196,15 @@ private[spark] class BlacklistTracker( private def executorsOnBlacklistedNode( sched: TaskSchedulerImpl, atomTask: StageAndPartition): Set[String] = { - nodeBlacklistForStage(atomTask.stageId).flatMap(sched.getExecutorsAliveOnHost(_) - .getOrElse(Set.empty[String])).toSet + nodeBlacklistForStage(atomTask.stageId).flatMap(sched.getExecutorsAliveOnHost(_) + .getOrElse(Set.empty[String])) } private def reEvaluateExecutorBlacklistAndUpdateCache( sched: TaskSchedulerImpl, atomTask: StageAndPartition, clock: Clock): Set[String] = { + // TODO some kind of logging when the blacklist is *updated* val executors = executorsOnBlacklistedNode(sched, atomTask) ++ strategy.getExecutorBlacklist(executorIdToFailureStatus, atomTask, clock) updateBlacklistExecutorCache(atomTask, executors) @@ -192,7 +213,8 @@ private[spark] class BlacklistTracker( private def reEvaluateNodeBlacklistForStageAndUpdateCache(stageId: Int): Set[String] = { val nodes = strategy.getNodeBlacklistForStage(executorIdToFailureStatus, stageId, clock) - updateBlacklistNodeCache(nodes) + updateBlacklistNodeForStageCache(stageId, nodes) +// updateBlacklistNodeCache(nodes) nodes } } @@ -200,8 +222,7 @@ private[spark] class BlacklistTracker( /** * Hide cache details in this trait to make code clean and avoid operation mistake */ -private[scheduler] trait BlacklistCache { - +private[scheduler] trait BlacklistCache extends Logging { // local cache to minimize the the work when query blacklisted executor and node private val blacklistExecutorCache = mutable.HashMap.empty[StageAndPartition, Set[String]] private val blacklistNodeCache = mutable.Set.empty[String] @@ -241,6 +262,11 @@ private[scheduler] trait BlacklistCache { protected def updateBlacklistNodeForStageCache( stageId: Int, blacklistNode: Set[String]): Unit = cacheLock.synchronized { + // TODO this needs to actually get called, and add unit test + val wasBlacklisted = blacklistNodeForStageCache.getOrElse(stageId, Set.empty[String]) + if (wasBlacklisted != blacklistNode) { + logInfo(s"Updating node blacklist for Stage ${stageId} to ${blacklistNode}") + } if (!_isBlacklistNodeForStageCacheValid) { blacklistNodeForStageCache.clear() } @@ -249,6 +275,7 @@ private[scheduler] trait BlacklistCache { } protected def invalidateCache(): Unit = cacheLock.synchronized { + logInfo("invalidating blacklist cache") _isBlacklistExecutorCacheValid = false _isBlacklistNodeCacheValid = false _isBlacklistNodeForStageCacheValid = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5291b663667ea..0c67becbc1b75 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1411,7 +1411,7 @@ class DAGScheduler( stage.clearFailures() } else { stage.latestInfo.stageFailed(errorMessage.get) - logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime)) + logInfo(s"$stage (${stage.name}) failed in $serviceTime s due to ${errorMessage.get}") } outputCommitCoordinator.stageEnd(stage.id) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 80f2bf41224b5..77fda6fcff959 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -59,7 +59,7 @@ private[spark] class DirectTaskResult[T]( val numUpdates = in.readInt if (numUpdates == 0) { - accumUpdates = null + accumUpdates = Seq() } else { val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]] for (i <- 0 until numUpdates) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 371fb8602f785..3b3dfa206a4e5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -248,10 +248,16 @@ private[spark] class TaskSchedulerImpl( availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false + // TODO unit test, and also add executor-stage filtering as well + // This is an optimization -- the taskSet might contain a very long list of pending tasks. + // Rather than wasting time checking the offer against each task, and then realizing the + // executor is blacklisted, just filter out the bad executor immediately. + val nodeBlacklist = taskSet.blacklistTracker.map{_.nodeBlacklistForStage(taskSet.stageId)} + .getOrElse(Set()) for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { + if (!nodeBlacklist(host) && availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index bd74eef10e485..1a716ff901d86 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -462,8 +462,8 @@ private[spark] class TaskSetManager( // a good proxy to task serialization time. // val timeTaken = clock.getTime() - startTime val taskName = s"task ${info.id} in stage ${taskSet.id}" - logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + - s" $taskLocality, ${serializedTask.limit} bytes)") + logInfo(s"Starting $taskName (TID $taskId, $host, exec ${info.executorId}, " + + s"partition ${task.partitionId},$taskLocality, ${serializedTask.limit} bytes)") sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, @@ -603,8 +603,9 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info) if (!successful(index)) { tasksSuccessful += 1 - logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format( - info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks)) + logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s / exec %s (%d/%d)".format( + info.id, taskSet.id, info.taskId, info.duration, info.host, info.executorId, + tasksSuccessful, numTasks)) // Mark successful and stop if all the tasks have succeeded. successful(index) = true if (tasksSuccessful == numTasks) { @@ -635,8 +636,8 @@ private[spark] class TaskSetManager( val index = info.index copiesRunning(index) -= 1 var accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty - val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}): " + - reason.asInstanceOf[TaskFailedReason].toErrorString + val failureReason = s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid, ${info.host}," + + s" exec ${info.executorId}): ${reason.asInstanceOf[TaskFailedReason].toErrorString}" val failureException: Option[Throwable] = reason match { case fetchFailed: FetchFailed => logWarning(failureReason) diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager index 3c570ffd8f566..757c6d2296aff 100644 --- a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager +++ b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager @@ -1 +1,2 @@ -org.apache.spark.scheduler.DummyExternalClusterManager \ No newline at end of file +org.apache.spark.scheduler.DummyExternalClusterManager +org.apache.spark.scheduler.MockExternalClusterManager \ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala new file mode 100644 index 0000000000000..6c9d4fb6f3bcc --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import org.apache.spark._ + +class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend]{ + + val badHost = "host-0" + + /** + * This backend just always fails if the task is executed on a bad host, but otherwise succeeds + * all tasks. + */ + def badHostBackend(): Unit = { + val task = backend.beginTask() + val host = backend.executorIdToExecutor(task.executorId).host + if (host == badHost) { + backend.taskFailed(task, new RuntimeException("I'm a bad host!")) + } else { + backend.taskSuccess(task, 42) + } + } + + // Test demonstrating the issue -- without a config change, the scheduler keeps scheduling + // according to locality preferences, and so the job fails + testScheduler("If preferred node is bad, without blacklist job will fail") { + val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) + withBackend(badHostBackend _) { + val jobFuture = submit(rdd, (0 until 10).toArray) + val duration = Duration(1, SECONDS) + Await.ready(jobFuture, duration) + } + assert(results.isEmpty) + assertDataStructuresEmpty(noFailure = false) + } + + // even with the blacklist turned on, if maxTaskFailures is not more than the number + // of executors on the bad node, then locality preferences will lead to us cycling through + // the executors on the bad node, and still failing the job + testScheduler( + "With blacklist on, job will still fail if there are too many bad executors on bad host", + extraConfs = Seq( + // just set this to something much longer than the test duration + ("spark.scheduler.executorTaskBlacklistTime", "10000000") + ) + ) { + val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) + withBackend(badHostBackend _) { + val jobFuture = submit(rdd, (0 until 10).toArray) + val duration = Duration(3, SECONDS) + Await.ready(jobFuture, duration) + } + assert(results.isEmpty) + assertDataStructuresEmpty(noFailure = false) + } + + // Here we run with the blacklist on, and maxTaskFailures high enough that we'll eventually + // schedule on a good node and succeed the job + testScheduler( + "Bad node with multiple executors, job will still succeed with the right confs", + extraConfs = Seq( + // just set this to something much longer than the test duration + ("spark.scheduler.executorTaskBlacklistTime", "10000000"), + // this has to be higher than the number of executors on the bad host + ("spark.task.maxFailures", "5"), + // just to avoid this test taking too long + ("spark.locality.wait", "10ms") + ) + ) { + val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) + withBackend(badHostBackend _) { + val jobFuture = submit(rdd, (0 until 10).toArray) + val duration = Duration(1, SECONDS) + Await.ready(jobFuture, duration) + } + assert(results === (0 until 10).map { _ -> 42 }.toMap) + assertDataStructuresEmpty(noFailure = true) + } + +} + +class MultiExecutorMockBackend( + conf: SparkConf, + taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) { + + val nHosts = conf.getInt("spark.testing.nHosts", 5) + val nExecutorsPerHost = conf.getInt("spark.testing.nExecutorsPerHost", 4) + val nCoresPerExecutor = conf.getInt("spark.testing.nCoresPerExecutor", 2) + + override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = { + (0 until nHosts).flatMap { hostIdx => + val hostName = "host-" + hostIdx + (0 until nExecutorsPerHost).map { subIdx => + val executorId = (hostIdx * nExecutorsPerHost + subIdx).toString + executorId -> + ExecutorTaskStatus(host = hostName, executorId = executorId, nCoresPerExecutor) + } + }.toMap + } + + override def defaultParallelism(): Int = nHosts * nExecutorsPerHost * nCoresPerExecutor +} + +class MockRDDWithLocalityPrefs( + sc: SparkContext, + numPartitions: Int, + shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]], + val preferredLoc: String) extends MockRDD(sc, numPartitions, shuffleDeps) { + override def getPreferredLocations(split: Partition): Seq[String] = { + Seq(preferredLoc) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 088a476086217..60051ef1f0d08 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -98,6 +98,8 @@ class DAGSchedulerSuiteDummyException extends Exception class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeouts { + import DAGSchedulerSuite._ + val conf = new SparkConf /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() @@ -2027,12 +2029,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } } - private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = - MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) - - private def makeBlockManagerId(host: String): BlockManagerId = - BlockManagerId("exec-" + host, host, 12345) - private def assertDataStructuresEmpty(): Unit = { assert(scheduler.activeJobs.isEmpty) assert(scheduler.failedStages.isEmpty) @@ -2072,5 +2068,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou } CompletionEvent(task, reason, result, accumUpdates ++ extraAccumUpdates, taskInfo) } +} + +object DAGSchedulerSuite { + def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = + MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) + def makeBlockManagerId(host: String): BlockManagerId = + BlockManagerId("exec-" + host, host, 12345) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala new file mode 100644 index 0000000000000..718963da7dc17 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import java.util.Properties +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.{Duration, SECONDS} +import scala.reflect.ClassTag + +import org.scalactic.TripleEquals +import org.scalatest.Assertions.AssertionsHelper + +import org.apache.spark._ +import org.apache.spark.TaskState._ +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.util.{CallSite, Utils} + +/** + * Tests for the entire scheduler code -- DAGScheduler, TaskSchedulerImpl, TaskSets, + * TaskSetManagers. + * + * Test cases are configured by providing a set of jobs to submit, and then simulating interaction + * with spark's executors via a mocked backend (eg., task completion, task failure, executors + * disconnecting, etc.). + */ +abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends SparkFunSuite + with LocalSparkContext { + + var taskScheduler: TestTaskScheduler = null + var scheduler: DAGScheduler = null + var backend: T = _ + + override def beforeEach(): Unit = { + if (taskScheduler != null) { + taskScheduler.runningTaskSets.clear() + } + results.clear() + failure = null + super.beforeEach() + } + + override def afterEach(): Unit = { + super.afterEach() + taskScheduler.stop() + backend.stop() + scheduler.stop() + } + + def setupScheduler(conf: SparkConf): Unit = { + conf.setAppName(this.getClass().getSimpleName()) + val backendClassName = implicitly[ClassTag[T]].runtimeClass.getName() + conf.setMaster(s"mock[${backendClassName}]") + sc = new SparkContext(conf) + backend = sc.schedulerBackend.asInstanceOf[T] + taskScheduler = sc.taskScheduler.asInstanceOf[TestTaskScheduler] + taskScheduler.initialize(sc.schedulerBackend) + scheduler = new DAGScheduler(sc, taskScheduler) + taskScheduler.setDAGScheduler(scheduler) + } + + def testScheduler(name: String)(testBody: => Unit): Unit = { + testScheduler(name, Seq())(testBody) + } + + def testScheduler(name: String, extraConfs: Seq[(String, String)])(testBody: => Unit): Unit = { + test(name) { + val conf = new SparkConf() + extraConfs.foreach{ case (k, v) => conf.set(k, v)} + setupScheduler(conf) + testBody + } + } + + val results = new HashMap[Int, Any]() + var failure: Throwable = _ + + /** + * When we submit dummy Jobs, this is the compute function we supply. + */ + private val jobComputeFunc: (TaskContext, scala.Iterator[_]) => Any = { + (context: TaskContext, it: Iterator[(_)]) => + throw new RuntimeException("jobComputeFunc shouldn't get called in this mock") + } + + /** Submits a job to the scheduler, and returns a future which does a bit of error handling. */ + protected def submit( + rdd: RDD[_], + partitions: Array[Int], + func: (TaskContext, Iterator[_]) => _ = jobComputeFunc): Future[Any] = { + val waiter: JobWaiter[Any] = scheduler.submitJob(rdd, func, partitions.toSeq, CallSite("", ""), + (index, res) => results(index) = res, new Properties()) + import scala.concurrent.ExecutionContext.Implicits.global + waiter.completionFuture.recover { case ex => + failure = ex + } + } + + protected def assertDataStructuresEmpty(noFailure: Boolean = true): Unit = { + if (noFailure) { + if (failure != null) { + // if there is a job failure, it can be a bit hard to tease the job failure msg apart + // from the test failure msg, so we do a little extra formatting + val msg = + raw""" + | There was a failed job. + | ----- Begin Job Failure Msg ----- + | ${Utils.exceptionString(failure)} + | ----- End Job Failure Msg ---- + """. + stripMargin + fail(msg) + } + // When a job fails, we terminate before waiting for all the task end events to come in, + // so there might still be a running task set. So we only check these conditions + // when the job succeeds + if (taskScheduler.runningTaskSets.nonEmpty) { + fail(s"taskScheduler still has running taskSets: ${taskScheduler.runningTaskSets}") + } + if (backend.hasTasks) { + fail(s"backend still has tasks. Waiting to run: ${backend.assignedTasksWaitingToRun}; " + + s"running : ${backend.runningTasks}") + } + } + assert(scheduler.activeJobs.isEmpty) + } + + /** + * Looks at all shuffleMapOutputs that are dependencies of the given RDD, and makes sure + * they are all registered + */ + def assertMapOutputAvailable(targetRdd: MockRDD): Unit = { + val shuffleIds = targetRdd.shuffleDeps.map{_.shuffleId} + val nParts = targetRdd.numPartitions + for { + shuffleId <- shuffleIds + reduceIdx <- (0 until nParts) + } { + val statuses = taskScheduler.mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceIdx) + // really we should have already thrown an exception rather than fail either of these + // asserts, but just to be extra defensive let's double check the statuses are OK + assert(statuses != null) + assert(statuses.nonEmpty) + } + } + + /** models a stage boundary with a single dependency, like a shuffle */ + def shuffle(nParts: Int, input: MockRDD): MockRDD = { + val partitioner = new HashPartitioner(nParts) + val shuffleDep = new ShuffleDependency[Int, Int, Nothing](input, partitioner) + new MockRDD(sc, nParts, List(shuffleDep)) + } + + /** models a stage boundary with multiple dependencies, like a join */ + def join(nParts: Int, inputs: MockRDD*): MockRDD = { + val partitioner = new HashPartitioner(nParts) + val shuffleDeps = inputs.map { inputRDD => + new ShuffleDependency[Int, Int, Nothing](inputRDD, partitioner) + } + new MockRDD(sc, nParts, shuffleDeps) + } + + /** + * Helper which makes it a little easier to setup a test, which starts a mock backend in another + * thread, responding to tasks with your custom function. You also supply the "body" of your + * test, where you submit jobs to your backend, wait for them to complete, then check + * whatever conditions you want. Note that this is *not* safe to all bad backends -- + * in particular, your `backendFunc` has to return quickly, it can't throw errors, (instead + * it should send back the right TaskEndReason) + */ + def withBackend[T](backendFunc: () => Unit)(testBody: => T): T = { + val backendContinue = new AtomicBoolean(true) + val backendThread = new Thread("mock backend thread") { + override def run(): Unit = { + while (backendContinue.get()) { + if (backend.hasTasksWaitingToRun) { + backendFunc() + } else { + Thread.sleep(10) + } + } + } + } + try { + backendThread.start() + testBody + } finally { + backendContinue.set(false) + backendThread.join() + } + } + +} + +/** + * Helper for running a backend in integration tests, does a bunch of the book-keeping + * so individual tests can focus on just responding to tasks. Individual tests will use + * [[beginTask]], [[taskSuccess]], and [[taskFailed]]. + */ +private[spark] abstract class MockBackend( + conf: SparkConf, + val taskScheduler: TaskSchedulerImpl) extends SchedulerBackend with Logging { + + /** + * Test backends should call this to get a task that has been assigned to them by the scheduler. + * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]]. + */ + def beginTask(): TaskDescription = { + synchronized { + val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1) + runningTasks += toRun + toRun + } + } + + /** + * Tell the scheduler the task completed successfully, with the given result. Also + * updates some internal state for this mock. + */ + def taskSuccess(task: TaskDescription, result: Any): Unit = { + val ser = env.serializer.newInstance() + val resultBytes = ser.serialize(result) + val directResult = new DirectTaskResult(resultBytes, Seq()) // no accumulator updates + taskUpdate(task, TaskState.FINISHED, directResult) + } + + /** + * Tell the scheduler the task failed, with the given state and result (probably ExceptionFailure + * or FetchFailed). Also updates some internal state for this mock. + */ + def taskFailed(task: TaskDescription, exc: Exception): Unit = { + taskUpdate(task, TaskState.FAILED, new ExceptionFailure(exc, Seq())) + } + + def taskFailed(task: TaskDescription, reason: TaskFailedReason): Unit = { + taskUpdate(task, TaskState.FAILED, reason) + } + + def taskUpdate(task: TaskDescription, state: TaskState, result: Any): Unit = { + val ser = env.serializer.newInstance() + val resultBytes = ser.serialize(result) + // statusUpdate is safe to call from multiple threads, its protected inside taskScheduler + taskScheduler.statusUpdate(task.taskId, state, resultBytes) + if (TaskState.isFinished(state)) { + synchronized { + runningTasks -= task + executorIdToExecutor(task.executorId).freeCores += taskScheduler.CPUS_PER_TASK + freeCores += taskScheduler.CPUS_PER_TASK + } + reviveOffers() + } + } + + // protected by this + val assignedTasksWaitingToRun = new ArrayBuffer[TaskDescription](10000) + // protected by this + val runningTasks = ArrayBuffer[TaskDescription]() + + def hasTasks: Boolean = synchronized { + assignedTasksWaitingToRun.nonEmpty || runningTasks.nonEmpty + } + + def hasTasksWaitingToRun: Boolean = { + assignedTasksWaitingToRun.nonEmpty + } + + override def start(): Unit = {} + + override def stop(): Unit = {} + + val env = SparkEnv.get + + /** Accessed by both scheduling and backend thread, so should be protected by this. */ + var freeCores: Int = _ + + /** + * Accessed by both scheduling and backend thread, so should be protected by this. + * Most likely the only thing that needs to be protected are the inidividual ExecutorTaskStatus, + * but for simplicity in this mock just lock the whole backend. + */ + def executorIdToExecutor: Map[String, ExecutorTaskStatus] + + private def generateOffers(): Seq[WorkerOffer] = { + executorIdToExecutor.values.filter { exec => + exec.freeCores > 0 + }.map { exec => + WorkerOffer(executorId = exec.executorId, host = exec.host, + cores = exec.freeCores) + }.toSeq + } + + /** + * This is called by the scheduler whenever it has tasks it would like to schedule. It gets + * called in the scheduling thread, not the backend thread. + */ + override def reviveOffers(): Unit = { + val offers: Seq[WorkerOffer] = generateOffers() + val newTasks = taskScheduler.resourceOffers(offers).flatten + synchronized { + newTasks.foreach { task => + executorIdToExecutor(task.executorId).freeCores -= taskScheduler.CPUS_PER_TASK + } + freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK + assignedTasksWaitingToRun ++= newTasks + } + } + + override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { + // We have to implement this b/c of SPARK-15385. + // Its OK for this to be a no-op, because even if a backend does implement killTask, + // it really can only be "best-effort" in any case, and the scheduler should be robust to that. + // And in fact its reasonably simulating a case where a real backend finishes tasks in between + // the time when the scheduler sends the msg to kill tasks, and the backend receives the msg. + } +} + +/** + * A very simple mock backend that can just run one task at a time. + */ +private[spark] class SingleCoreMockBackend( + conf: SparkConf, + taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) { + + val cores = 1 + + override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores) + + freeCores = cores + val localExecutorId = SparkContext.DRIVER_IDENTIFIER + val localExecutorHostname = "localhost" + + override val executorIdToExecutor: Map[String, ExecutorTaskStatus] = Map( + localExecutorId -> new ExecutorTaskStatus(localExecutorHostname, localExecutorId, freeCores) + ) +} + +case class ExecutorTaskStatus(host: String, executorId: String, var freeCores: Int) + +class MockRDD( + sc: SparkContext, + val numPartitions: Int, + val shuffleDeps: Seq[ShuffleDependency[Int, Int, Nothing]] +) extends RDD[(Int, Int)](sc, shuffleDeps) with Serializable { + + MockRDD.validate(numPartitions, shuffleDeps) + + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + throw new RuntimeException("should not be reached") + override def getPartitions: Array[Partition] = { + (0 until numPartitions).map(i => new Partition { + override def index: Int = i + }).toArray + } + override def getPreferredLocations(split: Partition): Seq[String] = Nil + override def toString: String = "MockRDD " + id +} + +object MockRDD extends AssertionsHelper with TripleEquals { + /** + * make sure all the shuffle dependencies have a consistent number of output partitions + * (mostly to make sure the test setup makes sense, not that Spark itself would get this wrong) + */ + def validate(numPartitions: Int, dependencies: Seq[ShuffleDependency[_, _, _]]): Unit = { + dependencies.foreach { dependency => + val partitioner = dependency.partitioner + assert(partitioner != null) + assert(partitioner.numPartitions === numPartitions) + } + } +} + +/** Simple cluster manager that wires up our mock backend. */ +private class MockExternalClusterManager extends ExternalClusterManager { + + val MOCK_REGEX = """mock\[(.*)\]""".r + def canCreate(masterURL: String): Boolean = MOCK_REGEX.findFirstIn(masterURL).isDefined + + def createTaskScheduler( + sc: SparkContext, + masterURL: String): TaskScheduler = { + new TestTaskScheduler(sc) + } + + def createSchedulerBackend( + sc: SparkContext, + masterURL: String, + scheduler: TaskScheduler): SchedulerBackend = { + masterURL match { + case MOCK_REGEX(backendClassName) => + val backendClass = Utils.classForName(backendClassName) + val ctor = backendClass.getConstructor(classOf[SparkConf], classOf[TaskSchedulerImpl]) + ctor.newInstance(sc.getConf, scheduler).asInstanceOf[SchedulerBackend] + } + } + + def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { + scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) + } +} + +/** TaskSchedulerImpl that just tracks a tiny bit more state to enable checks in tests. */ +class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) { + /** Set of TaskSets the DAGScheduler has requested executed. */ + // protected by this + val runningTaskSets = HashSet[TaskSet]() + + override def submitTasks(taskSet: TaskSet): Unit = { + synchronized { + runningTaskSets += taskSet + } + super.submitTasks(taskSet) + } + + override def taskSetFinished(manager: TaskSetManager): Unit = { + synchronized { + runningTaskSets -= manager.taskSet + } + super.taskSetFinished(manager) + } +} + +/** + * Some very basic tests just to demonstrate the use of the test framework (and verify that it + * works). + */ +class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCoreMockBackend] { + + /** + * Very simple one stage job. Backend successfully completes each task, one by one + */ + testScheduler("super simple job") { + def runBackend(): Unit = { + val task = backend.beginTask() + backend.taskSuccess(task, 42) + } + withBackend(runBackend _) { + val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) + val duration = Duration(1, SECONDS) + Await.ready(jobFuture, duration) + } + assert(results === (0 until 10).map { _ -> 42 }.toMap) + assertDataStructuresEmpty() + } + + /** + * 5 stage job, diamond dependencies. + * + * a ----> b ----> d --> result + * \--> c --/ + * + * Backend successfully completes each task + */ + testScheduler("multi-stage job") { + + def stageToOutputParts(stageId: Int): Int = { + stageId match { + case 0 => 10 + case 2 => 20 + case _ => 30 + } + } + + val a = new MockRDD(sc, 2, Nil) + val b = shuffle(10, a) + val c = shuffle(20, a) + val d = join(30, b, c) + + def runBackend(): Unit = { + val taskDescription = backend.beginTask() + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + + // make sure the required map output is available + task.stageId match { + case 1 => assertMapOutputAvailable(b) + case 3 => assertMapOutputAvailable(c) + case 4 => assertMapOutputAvailable(d) + case _ => // no shuffle map input, nothing to check + } + + (task.stageId, task.stageAttemptId, task.partitionId) match { + case (stage, 0, _) if stage < 4 => + backend.taskSuccess(taskDescription, + DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage))) + case (4, 0, partition) => + backend.taskSuccess(taskDescription, 4321 + partition) + } + } + withBackend(runBackend _) { + val jobFuture = submit(d, (0 until 30).toArray) + val duration = Duration(1, SECONDS) + Await.ready(jobFuture, duration) + } + assert(results === (0 until 30).map { idx => idx -> (4321 + idx) }.toMap) + assertDataStructuresEmpty() + } + + /** + * 2 stage job, with a fetch failure. Make sure that: + * (a) map output is available whenever we run stage 1 + * (b) we get a second attempt for stage 0 & stage 1 + */ + testScheduler("job with fetch failure") { + val input = new MockRDD(sc, 2, Nil) + val shuffledRdd = shuffle(10, input) + val shuffleId = shuffledRdd.shuffleDeps.head.shuffleId + + val stageToAttempts = new HashMap[Int, HashSet[Int]]() + + def runBackend(): Unit = { + val taskDescription = backend.beginTask() + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + stageToAttempts.getOrElseUpdate(task.stageId, new HashSet()) += task.stageAttemptId + + // make sure the required map output is available + task.stageId match { + case 1 => assertMapOutputAvailable(shuffledRdd) + case _ => // no shuffle map input, nothing to check + } + + (task.stageId, task.stageAttemptId, task.partitionId) match { + case (0, _, _) => + backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 10)) + case (1, 0, 0) => + val fetchFailed = FetchFailed( + DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored") + backend.taskFailed(taskDescription, fetchFailed) + case (1, _, partition) => + backend.taskSuccess(taskDescription, 42 + partition) + } + } + withBackend(runBackend _) { + val jobFuture = submit(shuffledRdd, (0 until 10).toArray) + val duration = Duration(1, SECONDS) + Await.ready(jobFuture, duration) + } + assert(results === (0 until 10).map { idx => idx -> (42 + idx) }.toMap) + assert(stageToAttempts === Map(0 -> Set(0, 1), 1 -> Set(0, 1))) + assertDataStructuresEmpty() + } + + testScheduler("job failure after 4 attempts") { + def runBackend(): Unit = { + val task = backend.beginTask() + backend.taskFailed(task, new RuntimeException("test task failure")) + } + withBackend(runBackend _) { + val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray) + val duration = Duration(1, SECONDS) + Await.ready(jobFuture, duration) + failure.getMessage.contains("test task failure") + } + assert(results.isEmpty) + assertDataStructuresEmpty(noFailure = false) + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerPerformanceSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerPerformanceSuite.scala new file mode 100644 index 0000000000000..515ce0a4d6e69 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerPerformanceSuite.scala @@ -0,0 +1,429 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler + +import java.util.concurrent.atomic.AtomicBoolean + +import scala.concurrent.duration.Duration + +import org.apache.spark.util.Utils + +class SchedulerPerformanceSuite extends SchedulerIntegrationSuite[MultiExecutorMockBackend] { + + def simpleWorkload(N: Int): MockRDD = { + // relatively simple job with 5 stages, so scheduling includes some aspects of submitting stages + // in addition to tasks + val a = new MockRDD(sc, N, Nil) + val b = shuffle(N, a) + val c = shuffle(N, a) + join(N, b, c) + } + + def runJobWithCustomBackend(N: Int, backendWrapper: WrappedBackend): Unit = { + // Try to run as many jobs as we can in 10 seconds, get the time per job. The idea here is to + // balance: + // 1) have a big enough job that we're not effected by delays just from waiting for job + // completion to propagate to the user thread (probably minor) + // 2) run enough iterations to get some reliable data + // 3) not wait toooooo long + var itrs = 0 + val totalMs = backendWrapper.withBackend { + val start = System.currentTimeMillis() + while (System.currentTimeMillis() - start < 10000 ) { +// while (System.currentTimeMillis() - start < 10000 && itrs == 0) { + withClue(s"failure in iteration = $itrs") { + val itrStart = System.currentTimeMillis() + val jobFuture = submit(simpleWorkload(N), (0 until N).toArray) + // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`, + // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that + // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's + // safe to pass in null here. For more detail, see SPARK-13747. + val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait] + jobFuture.ready(Duration.Inf)(awaitPermission) + // scalastyle:off println + println(s"Iteration $itrs finished in" + + s" ${Utils.msDurationToString(System.currentTimeMillis() - itrStart)}") + // scalastyle:on println + assertDataStructuresEmpty(noFailure = true) + itrs += 1 + } + } + (System.currentTimeMillis() - start) + } + + val msPerItr = Utils.msDurationToString((totalMs.toDouble / itrs).toLong) + // scalastyle:off println + println(s"ran $itrs iterations in ${Utils.msDurationToString(totalMs)} ($msPerItr per itr)") + // scalastyle:on println + } + + def runSuccessfulJob(N: Int): Unit = { + runJobWithCustomBackend(N, new QueuingWrappedBackend(backend) { + override def handleTask(taskDesc: TaskDescription, task: Task[_], host: String): Unit = { + // every 5th stage is a ResultStage -- the rest are ShuffleMapStages + (task.stageId, task.partitionId) match { + case (stage, _) if stage % 5 != 4 => + queueSuccess(taskDesc, DAGSchedulerSuite.makeMapStatus(host, N)) + case (_, _) => + queueSuccess(taskDesc, 42) + } + } + }) + } + + testScheduler("Scheduling speed -- small job on a small cluster") { + runSuccessfulJob(40) + } + + testScheduler("COMPARE C Scheduling speed -- large job on a small cluster") { + runSuccessfulJob(3000) + } + + testScheduler( + "COMPARE C Scheduling speed -- large job on a small cluster with advanced blacklist", + extraConfs = Seq( + "spark.scheduler.executorTaskBlacklistTime" -> "10000000", + "spark.scheduler.blacklist.advancedStrategy" -> "true" + ) + ) { + runSuccessfulJob(3000) + } + + testScheduler( + "COMPARE A Scheduling speed -- large job on a super node", + extraConfs = Seq( + "spark.testing.nHosts" -> "1", + "spark.testing.nExecutorsPerHost" -> "1", + "spark.testing.nCoresPerExecutor" -> "20000" + ) + ) { + runSuccessfulJob(3000) + } + + testScheduler( + // 4 execs per node, 2 cores per exec, so 400 cores + "COMPARE A Scheduling speed -- large job on 50 node cluster", + extraConfs = Seq( + "spark.testing.nHosts" -> "50" + ) + ) { + runSuccessfulJob(3000) + } + + testScheduler( + // 4 execs per node, 2 cores per exec, so 800 cores + "COMPARE A Scheduling speed -- large job on 100 node cluster", + extraConfs = Seq( + "spark.testing.nHosts" -> "100" + ) + ) { + runSuccessfulJob(3000) + } + + Seq(200, 300, 400, 450, 500, 550).foreach { nodes => + /* + ran 1 iterations in 12.9 s (12.9 s per itr) + [info] - COMPARE A: Scheduling speed -- large job on 200 node cluster (13 seconds, 861 + milliseconds) + ran 1 iterations in 25.0 s (25.0 s per itr) + [info] - COMPARE A: Scheduling speed -- large job on 300 node cluster (25 seconds, 50 + milliseconds) + ran 1 iterations in 34.6 s (34.6 s per itr) + [info] - COMPARE A: Scheduling speed -- large job on 400 node cluster (34 seconds, + 668 milliseconds) + ran 1 iterations in 54.0 s (54.0 s per itr) + [info] - COMPARE A: Scheduling speed -- large job on 450 node cluster (53 seconds, + 991 milliseconds) + ran 1 iterations in 1.8 m (1.8 m per itr) + [info] - COMPARE A: Scheduling speed -- large job on 500 node cluster (1 minute, 48 seconds) + ran 1 iterations in 2.3 m (2.3 m per itr) + [info] - COMPARE A: Scheduling speed -- large job on 550 node cluster (2 minutes, 19 seconds) + */ + testScheduler( + s"COMPARE A: Scheduling speed -- large job on ${nodes} node cluster", + extraConfs = Seq( + "spark.testing.nHosts" -> s"$nodes" + ) + ) { + runSuccessfulJob(3000) + } + } + + /* + nHosts = 400; nExecutorsPerHost = 1; nCores = 800 + ran 2 iterations in 11.7 s (5.9 s per itr) + [info] - COMPARE B: Lots of nodes (12 seconds, 679 milliseconds) + nHosts = 1; nExecutorsPerHost = 400; nCores = 800 + ran 3 iterations in 14.2 s (4.7 s per itr) + [info] - COMPARE B: Lots of executors, one node (14 seconds, 290 milliseconds) + nHosts = 1; nExecutorsPerHost = 1; nCores = 800 + ran 3 iterations in 11.0 s (3.7 s per itr) + [info] - COMPARE B: Super executor (11 seconds, 6 milliseconds) + */ + testScheduler( + s"COMPARE B: Lots of nodes", + extraConfs = Seq( + "spark.testing.nHosts" -> "400", + "spark.testing.nExecutorsPerHost" -> "1" + ) + ) { + runSuccessfulJob(3000) + } + + testScheduler( + s"COMPARE B: Lots of executors, one node", + extraConfs = Seq( + "spark.testing.nHosts" -> "1", + "spark.testing.nExecutorsPerHost" -> "400" + ) + ) { + runSuccessfulJob(3000) + } + + testScheduler( + s"COMPARE B: Super executor", + extraConfs = Seq( + "spark.testing.nHosts" -> "1", + "spark.testing.nExecutorsPerHost" -> "1", + "spark.testing.nCoresPerExecutor" -> "800" + ) + ) { + runSuccessfulJob(3000) + } + + def runBadExecJob(N: Int, badExecs: Set[String], badHosts: Set[String]): Unit = { + val backendWrapper = new QueuingWrappedBackend(backend) { + override def handleTask(taskDesc: TaskDescription, task: Task[_], host: String): Unit = { + if (badExecs(taskDesc.executorId)) { + val exc = new RuntimeException(s"bad exec ${taskDesc.executorId}") + queueFailure(taskDesc, exc) + } else if (badHosts(host)) { + val exc = new RuntimeException(s"bad host ${host}") + queueFailure(taskDesc, exc) + } else { + // every 5th stage is a ResultStage -- the rest are ShuffleMapStages + (task.stageId, task.partitionId) match { + case (stage, _) if stage % 5 != 4 => + queueSuccess(taskDesc, DAGSchedulerSuite.makeMapStatus(host, N)) + case (_, _) => + queueSuccess(taskDesc, 42) + } + } + } + } + runJobWithCustomBackend(N, backendWrapper) + } + + val oneBadExec = Set("0") + // intentionally on different nodes, so they don't trigger node blacklist + val twoBadExecs = Set("0", "15") + + + // note this is *very* unlikely to succeed without blacklisting, even though its only + // one bad executor out of 20. When a task fails, it gets requeued immediately -- and guess + // which is the only executor which has a free slot? Bingo, the one it just failed on + Seq( + ("bad exec with simple blacklist", "false", oneBadExec, Set[String]()), + ("two bad execs with simple blacklist", "false", twoBadExecs, Set[String]()), + ("bad exec with advanced blacklist", "true", oneBadExec, Set[String]()), + ("bad host with advanced blacklist", "true", Set[String](), Set[String]("host-0")), + ("bad exec and host with advanced blacklist", "true", oneBadExec, Set[String]("host-3")) + ).foreach { case (name, strategy, badExecs, badHosts) => + testScheduler( + s"COMPARE D $name", + extraConfs = Seq( + "spark.scheduler.executorTaskBlacklistTime" -> "10000000", + "spark.scheduler.blacklist.advancedStrategy" -> strategy + ) + ) { + // scalastyle:off println + println(s"Bad execs = ${badExecs}") + // scalastyle:on println + + // because offers get shuffled, its a crapshoot whether or not the "bad" executor will finish + // tasks first. (A more complicated mock backend could make sure it fails the first executor + // it gets assigned) + runBadExecJob(3000, badExecs, badHosts) + } + } + + + // scalastyle:off line.size.limit + + /* + Here's how you can get into really slow scheduling, even with the simple blacklist. Say there + is just one bad executor. You've got a bunch of tasks to run, and you schedule all available + slots. Then one task fails on your bad executor. You don't re-schedule that task on the bad + executor, but you do think you've got one open slot, so you try to find the next task you can + schedule. Since you've got a massive backlog of tasks, you just take the next task and schedule + it on your bad executor. The task fails again. + + This repeats a while, and now you've gone through and failed a bunch of tasks on this one bad + executor. But each time, you clear the cache of invalid executors, so you do a bunch of work + to recompute the set of OK executors. This is *really* expensive, and doesn't help you at all + anyway. + + +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO BlacklistTracker: Blacklisting executors Set() for task StageAndPartition(8,38) +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO TaskSetManager: Starting task 38.0 in stage 8.0 (TID 21056, host-2, partition 38, PROCESS_LOCAL, 5112 bytes) +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO BlacklistTracker: Blacklisting executors Set() for task StageAndPartition(8,39) +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO TaskSetManager: Starting task 39.0 in stage 8.0 (TID 21057, host-0, partition 39, PROCESS_LOCAL, 5112 bytes) +16/05/23 20:53:57.871 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.871 mock backend thread INFO BlacklistTracker: Blacklisting executors Set() for task StageAndPartition(8,40) +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO DAGScheduler: ShuffleMapStage 5 (RDD at SchedulerIntegrationSuite.scala:360) finished in 1.731 s +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO DAGScheduler: looking for newly runnable stages +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO DAGScheduler: running: Set(ShuffleMapStage 8) +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO DAGScheduler: waiting: Set(ResultStage 9, ShuffleMapStage 6) +16/05/23 20:53:57.871 dag-scheduler-event-loop INFO DAGScheduler: failed: Set() +16/05/23 20:53:57.872 mock backend thread INFO TaskSetManager: Starting task 40.0 in stage 8.0 (TID 21058, host-0, partition 40, PROCESS_LOCAL, 5112 bytes) +16/05/23 20:53:57.872 task-result-getter-2 WARN TaskSetManager: Lost task 39.0 in stage 8.0 (TID 21057, host-0): java.lang.RuntimeException: bad exec 1 + at org.apache.spark.scheduler.SchedulerPerformanceSuite.backendWithBadExecs(SchedulerPerformanceSuite.scala:218) + at org.apache.spark.scheduler.SchedulerPerformanceSuite$$anonfun$runBadExecJob$1.apply$mcV$sp(SchedulerPerformanceSuite.scala:236) + at org.apache.spark.scheduler.SchedulerIntegrationSuite$$anon$2.run(SchedulerIntegrationSuite.scala:194) + +16/05/23 20:53:57.872 task-result-getter-2 INFO BlacklistTracker: invalidating blacklist cache +16/05/23 20:53:57.872 dag-scheduler-event-loop INFO DAGScheduler: Submitting ShuffleMapStage 6 (MockRDD 5), which has no missing parents +16/05/23 20:53:57.872 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.872 mock backend thread INFO BlacklistTracker: Blacklisting executors Set(1) for task StageAndPartition(8,39) +16/05/23 20:53:57.872 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.872 mock backend thread INFO BlacklistTracker: Blacklisting executors Set() for task StageAndPartition(8,41) +16/05/23 20:53:57.872 mock backend thread INFO TaskSetManager: Starting task 41.0 in stage 8.0 (TID 21059, host-0, partition 41, PROCESS_LOCAL, 5112 bytes) +16/05/23 20:53:57.872 task-result-getter-3 INFO TaskSetManager: Lost task 40.0 in stage 8.0 (TID 21058) on executor host-0: java.lang.RuntimeException (bad exec 1) [duplicate 1] +16/05/23 20:53:57.872 task-result-getter-3 INFO BlacklistTracker: invalidating blacklist cache +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set(1) for task StageAndPartition(8,40) +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set(1) for task StageAndPartition(8,39) +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set() for task StageAndPartition(8,42) +16/05/23 20:53:57.873 mock backend thread INFO TaskSetManager: Starting task 42.0 in stage 8.0 (TID 21060, host-0, partition 42, PROCESS_LOCAL, 5112 bytes) +16/05/23 20:53:57.873 task-result-getter-1 INFO TaskSetManager: Lost task 41.0 in stage 8.0 (TID 21059) on executor host-0: java.lang.RuntimeException (bad exec 1) [duplicate 2] +16/05/23 20:53:57.873 task-result-getter-1 INFO BlacklistTracker: invalidating blacklist cache +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set(1) for task StageAndPartition(8,41) +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set(1) for task StageAndPartition(8,40) +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set(1) for task StageAndPartition(8,39) +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting nodes Set() for stage 8 +16/05/23 20:53:57.873 mock backend thread INFO BlacklistTracker: Blacklisting executors Set() for task StageAndPartition(8,43) + + */ + + // scalastyle:on line.size.limit + + + /* + RESULTS + + On a happy cluster, speed is about the same in all modes, ~5s per iteration + + On a bad cluster, slow in all versions, about 2m per iteration (original code, and new code with + various strategies). the reason is that we waste soooooo long looping all tasks through + the bad nodes, and that has one n^2 penalty. + + */ + + abstract class WrappedBackend(backend: MockBackend) { + val backendContinue = new AtomicBoolean(true) + def runBackend(continue: AtomicBoolean): Unit + val backendThread = new Thread("mock backend thread") { + override def run(): Unit = { + runBackend(backendContinue) + } + } + + def withBackend[T](testBody: => T): T = { + try { + backendThread.start() + testBody + } finally { + backendContinue.set(false) + backendThread.join() + } + } + + } + + abstract class QueuingWrappedBackend(backend: MockBackend) extends WrappedBackend(backend) { + var tasksToFail = List[(TaskDescription, Exception)]() + var tasksToSucceed = List[(TaskDescription, Any)]() + val FAILURES_TILL_SUCCESS = 100 + // that is, we get a task failure 100 times as fast as success + val waitForSuccess = 100 + var failuresSinceLastSuccess = 0 + + def handleTask(taskDesc: TaskDescription, task: Task[_], host: String): Unit + + def queueSuccess(taskDesc: TaskDescription, result: Any): Unit = { + tasksToSucceed :+= taskDesc -> result + } + + def queueFailure(taskDesc: TaskDescription, exc: Exception): Unit = { + tasksToFail :+= taskDesc -> exc + } + + override def runBackend(continue: AtomicBoolean): Unit = { + while (continue.get()) { + // don't *just* keep failing tasks on the same executor. While there are tasks to fail, + // we fail them more often, but we fail across all executors. Furthermore, after X failures + // we do have a task success + + // first, queue up all the tasks needing to run + while (backend.hasTasksWaitingToRun) { + val taskDescription = backend.beginTask() + val host = backend.executorIdToExecutor(taskDescription.executorId).host + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + handleTask(taskDescription, task, host) + } + + // send a task result. Prioritize failures, if we haven't had too many failures in a row + def failTask(): Unit = { + failuresSinceLastSuccess += 1 + val (toFail, exc) = tasksToFail.head + tasksToFail = tasksToFail.tail + backend.taskFailed(toFail, exc) + } + + if (tasksToFail.nonEmpty && failuresSinceLastSuccess < FAILURES_TILL_SUCCESS) { + failTask() + } else if (tasksToSucceed.nonEmpty) { + // we might get here just by some chance of thread-scheduling in this mock. Tasks fail, + // but the scheduler thread hasn't processed those before this thread tries to find + // another task to respond to. + // if (tasksToFail.nonEmpty && failuresSinceLastSuccess < FAILURES_TILL_SUCCESS) { + // failTask() + // } else { + logInfo(s"tasksToFail.size = ${tasksToFail.size}; " + + s"tasksToSucceed.size = ${tasksToSucceed.size}; " + + s"failuresSinceLastSuccess = ${failuresSinceLastSuccess}") + failuresSinceLastSuccess = 0 + val (taskDescription, result) = tasksToSucceed.head + tasksToSucceed = tasksToSucceed.tail + val host = backend.executorIdToExecutor(taskDescription.executorId).host + val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet + val task = taskSet.tasks(taskDescription.index) + backend.taskSuccess(taskDescription, result) + } else { + Thread.sleep(10) // wait till we've got work to do + } + } + } + } +}