diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f70ee2e5c71c7..848a728bdb4d7 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -543,6 +543,16 @@ package object config { .version("1.2.0") .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) + private[spark] val LEGACY_LOCALITY_WAIT_RESET = + ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch") + .doc("Whether to use the legacy behavior of locality wait, which resets the delay timer " + + "anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " + + "documentation for more details.") + .internal() + .version("3.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .version("0.5.0") .timeConf(TimeUnit.MILLISECONDS) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index f0f84fe63d1cf..92382e03f6d11 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -36,7 +36,7 @@ import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -57,11 +57,24 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay * scheduling * * task-result-getter threads + * + * Delay Scheduling: + * Delay scheduling is an optimization that sacrifices job fairness for data locality in order to + * improve cluster and workload throughput. One useful definition of "delay" is how much time + * has passed since the TaskSet was using its fair share of resources. Since it is impractical to + * calculate this delay without a full simulation, the heuristic used is the time since the + * TaskSetManager last launched a task and has not rejected any resources due to delay scheduling + * since it was last offered its "fair share". A "fair share" offer is when [[resourceOffers]]'s + * parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource + * is not utilized despite there being pending tasks (implemented inside [[TaskSetManager]]). + * The legacy heuristic only measured the time since the [[TaskSetManager]] last launched a task, + * and can be re-enabled by setting spark.locality.wait.legacyResetOnTaskLaunch to true. */ private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, - isLocal: Boolean = false) + isLocal: Boolean = false, + clock: Clock = new SystemClock) extends TaskScheduler with Logging { import TaskSchedulerImpl._ @@ -97,6 +110,11 @@ private[spark] class TaskSchedulerImpl( // on this class. Protected by `this` private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] + // keyed by taskset + // value is true if the task set's locality wait timer was reset on the last resource offer + private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]() + private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) + // Protected by `this` private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] // Protected by `this` @@ -125,7 +143,6 @@ private[spark] class TaskSchedulerImpl( protected val executorIdToHost = new HashMap[String, String] private val abortTimer = new Timer(true) - private val clock = new SystemClock // Exposed for testing val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] @@ -319,11 +336,27 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } + resetOnPreviousOffer -= manager.taskSet manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") } + /** + * Offers resources to a single [[TaskSetManager]] at a given max allowed [[TaskLocality]]. + * + * @param taskSet task set manager to offer resources to + * @param maxLocality max locality to allow when scheduling + * @param shuffledOffers shuffled resource offers to use for scheduling, + * remaining resources are tracked by below fields as tasks are scheduled + * @param availableCpus remaining cpus per offer, + * value at index 'i' corresponds to shuffledOffers[i] + * @param availableResources remaining resources per offer, + * value at index 'i' corresponds to shuffledOffers[i] + * @param tasks tasks scheduled per offer, value at index 'i' corresponds to shuffledOffers[i] + * @param addressesWithDescs tasks scheduler per host:port, used for barrier tasks + * @return tuple of (had delay schedule rejects?, option of min locality of launched task) + */ private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, @@ -331,8 +364,10 @@ private[spark] class TaskSchedulerImpl( availableCpus: Array[Int], availableResources: Array[Map[String, Buffer[String]]], tasks: IndexedSeq[ArrayBuffer[TaskDescription]], - addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { - var launchedTask = false + addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) + : (Boolean, Option[TaskLocality]) = { + var noDelayScheduleRejects = true + var minLaunchedLocality: Option[TaskLocality] = None // nodes and executors that are blacklisted for the entire application have already been // filtered out by this point for (i <- 0 until shuffledOffers.size) { @@ -348,11 +383,14 @@ private[spark] class TaskSchedulerImpl( try { val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) - val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, - taskResAssignments) + val (taskDescOption, didReject) = + taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments) + noDelayScheduleRejects &= !didReject for (task <- taskDescOption) { tasks(i) += task val tid = task.taskId + val locality = taskSet.taskInfos(task.taskId).taskLocality + minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality)) taskIdToTaskSetManager.put(tid, taskSet) taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) @@ -372,19 +410,18 @@ private[spark] class TaskSchedulerImpl( // The executor address is expected to be non empty. addressesWithDescs += (shuffledOffers(i).address.get -> task) } - launchedTask = true } } catch { case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") // Do not offer resources for this task, but don't throw an error to allow other // task sets to be submitted. - return launchedTask + return (noDelayScheduleRejects, minLaunchedLocality) } } } } - launchedTask + (noDelayScheduleRejects, minLaunchedLocality) } /** @@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl( }.sum } + private def minTaskLocality( + l1: Option[TaskLocality], + l2: Option[TaskLocality]) : Option[TaskLocality] = { + if (l1.isEmpty) { + l2 + } else if (l2.isEmpty) { + l1 + } else if (l1.get < l2.get) { + l1 + } else { + l2 + } + } + /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ - def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { + def resourceOffers( + offers: IndexedSeq[WorkerOffer], + isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added var newExecAvail = false @@ -544,18 +597,34 @@ private[spark] class TaskSchedulerImpl( s"number of available slots is $numBarrierSlotsAvailable.") } else { var launchedAnyTask = false + var noDelaySchedulingRejects = true + var globalMinLocality: Option[TaskLocality] = None // Record all the executor IDs assigned barrier tasks on. val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() for (currentMaxLocality <- taskSet.myLocalityLevels) { var launchedTaskAtCurrentMaxLocality = false do { - launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, - currentMaxLocality, shuffledOffers, availableCpus, + val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( + taskSet, currentMaxLocality, shuffledOffers, availableCpus, availableResources, tasks, addressesWithDescs) + launchedTaskAtCurrentMaxLocality = minLocality.isDefined launchedAnyTask |= launchedTaskAtCurrentMaxLocality + noDelaySchedulingRejects &= noDelayScheduleReject + globalMinLocality = minTaskLocality(globalMinLocality, minLocality) } while (launchedTaskAtCurrentMaxLocality) } + if (!legacyLocalityWaitReset) { + if (noDelaySchedulingRejects && launchedAnyTask) { + if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) { + taskSet.resetDelayScheduleTimer(globalMinLocality) + resetOnPreviousOffer.update(taskSet.taskSet, true) + } + } else { + resetOnPreviousOffer.update(taskSet.taskSet, false) + } + } + if (!launchedAnyTask) { taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => // If the taskSet is unschedulable we try to find an existing idle blacklisted diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 2c792338b5295..a0e84b94735ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -221,10 +221,11 @@ private[spark] class TaskSetManager( private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait) // Delay scheduling variables: we keep track of our current locality level and the time we - // last launched a task at that level, and move up a level when localityWaits[curLevel] expires. - // We then move down if we manage to launch a "more local" task. + // last reset the locality wait timer, and move up a level when localityWaits[curLevel] expires. + // We then move down if we manage to launch a "more local" task when resetting the timer + private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels - private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level + private var lastLocalityWaitResetTime = clock.getTimeMillis() // Time we last reset locality wait override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null @@ -386,6 +387,14 @@ private[spark] class TaskSetManager( None } + private[scheduler] def resetDelayScheduleTimer( + minLocality: Option[TaskLocality.TaskLocality]): Unit = { + lastLocalityWaitResetTime = clock.getTimeMillis() + for (locality <- minLocality) { + currentLocalityIndex = getLocalityIndex(locality) + } + } + /** * Respond to an offer of a single executor from the scheduler by finding a task * @@ -396,6 +405,9 @@ private[spark] class TaskSetManager( * @param execId the executor Id of the offered resource * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at + * + * @return Tuple containing: + * (TaskDescription of launched task if any, rejected resource due to delay scheduling?) */ @throws[TaskNotSerializableException] def resourceOffer( @@ -403,7 +415,7 @@ private[spark] class TaskSetManager( host: String, maxLocality: TaskLocality.TaskLocality, taskResourceAssignments: Map[String, ResourceInformation] = Map.empty) - : Option[TaskDescription] = + : (Option[TaskDescription], Boolean) = { val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => blacklist.isNodeBlacklistedForTaskSet(host) || @@ -422,7 +434,9 @@ private[spark] class TaskSetManager( } } - dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => + val taskDescription = + dequeueTask(execId, host, allowedLocality) + .map { case (index, taskLocality, speculative) => // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() @@ -433,11 +447,8 @@ private[spark] class TaskSetManager( execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) - // Update our locality level for delay scheduling - // NO_PREF will not affect the variables related to delay scheduling - if (maxLocality != TaskLocality.NO_PREF) { - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime + if (legacyLocalityWaitReset && maxLocality != TaskLocality.NO_PREF) { + resetDelayScheduleTimer(Some(taskLocality)) } // Serialize and return the task val serializedTask: ByteBuffer = try { @@ -482,8 +493,14 @@ private[spark] class TaskSetManager( taskResourceAssignments, serializedTask) } + val hasPendingTasks = pendingTasks.all.nonEmpty || pendingSpeculatableTasks.all.nonEmpty + val hasScheduleDelayReject = + taskDescription.isEmpty && + maxLocality == TaskLocality.ANY && + hasPendingTasks + (taskDescription, hasScheduleDelayReject) } else { - None + (None, false) } } @@ -547,14 +564,14 @@ private[spark] class TaskSetManager( // This is a performance optimization: if there are no more tasks that can // be scheduled at a particular locality level, there is no point in waiting // for the locality wait timeout (SPARK-4939). - lastLaunchTime = curTime + lastLocalityWaitResetTime = curTime logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") currentLocalityIndex += 1 - } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { - // Jump to the next locality level, and reset lastLaunchTime so that the next locality - // wait timer doesn't immediately expire - lastLaunchTime += localityWaits(currentLocalityIndex) + } else if (curTime - lastLocalityWaitResetTime >= localityWaits(currentLocalityIndex)) { + // Jump to the next locality level, and reset lastLocalityWaitResetTime so that the next + // locality wait timer doesn't immediately expire + lastLocalityWaitResetTime += localityWaits(currentLocalityIndex) logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " + s"${localityWaits(currentLocalityIndex)}ms") currentLocalityIndex += 1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index cca8e86b48691..ad6f806a540bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -303,7 +303,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp (rName, rInfo.availableAddrs.toBuffer) }, executorData.resourceProfileId) }.toIndexedSeq - scheduler.resourceOffers(workOffers) + scheduler.resourceOffers(workOffers, true) } if (taskDescs.nonEmpty) { launchTasks(taskDescs) @@ -331,7 +331,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorData.resourcesInfo.map { case (rName, rInfo) => (rName, rInfo.availableAddrs.toBuffer) }, executorData.resourceProfileId)) - scheduler.resourceOffers(workOffers) + scheduler.resourceOffers(workOffers, false) } else { Seq.empty } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index e2b1198060f76..0ffe032296b0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -88,7 +88,7 @@ private[spark] class LocalEndpoint( // local mode doesn't support extra resources like GPUs right now val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores, Some(rpcEnv.address.hostPort))) - for (task <- scheduler.resourceOffers(offers).flatten) { + for (task <- scheduler.resourceOffers(offers, true).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, task) } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 0b2a58d8e135d..57cbda3c0620d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -510,7 +510,7 @@ class StandaloneDynamicAllocationSuite val taskScheduler = mock(classOf[TaskSchedulerImpl]) when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) - when(taskScheduler.resourceOffers(any())).thenReturn(Nil) + when(taskScheduler.resourceOffers(any(), any[Boolean])).thenReturn(Nil) when(taskScheduler.sc).thenReturn(sc) val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index f4745dbb5a747..058f4013005e5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -248,7 +248,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], new Properties(), taskResources, bytebuffer))) val ts = backend.getTaskSchedulerImpl() - when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]])).thenReturn(taskDescs) + when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs) backend.driverEndpoint.send(ReviveOffers) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index b9a11e7f66c64..056c34278c1ea 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -196,6 +196,240 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + private def setupTaskSchedulerForLocalityTests(clock: ManualClock): TaskSchedulerImpl = { + val conf = new SparkConf() + sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) + val taskScheduler = new TaskSchedulerImpl(sc, + sc.conf.get(config.TASK_MAX_FAILURES), + clock = clock) { + override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = { + new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock) + } + override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = { + // Don't shuffle the offers around for this test. Instead, we'll just pass in all + // the permutations we care about directly. + offers + } + } + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {} + + override def executorAdded(execId: String, host: String): Unit = {} + } + taskScheduler.initialize(new FakeSchedulerBackend) + val taskSet = FakeTask.createTaskSet(8, 1, 1, + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")) + ) + + // Offer resources first so that when the taskset is submitted it can initialize + // with proper locality level. Otherwise, ANY would be the only locality level. + // See TaskSetManager.computeValidLocalityLevels() + // This begins the task set as PROCESS_LOCAL locality level + taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1))) + taskScheduler.submitTasks(taskSet) + taskScheduler + } + + test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer before " + + "any resources have been rejected") { + val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 + + // Advancing clock increases locality level to NODE_LOCAL. + clock.advance(advanceAmount) + + // If there hasn't yet been any full resource offers, + // partial resource (isAllFreeResources = false) offers reset delay scheduling + // if this and previous offers were accepted. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // This NODE_LOCAL task should not be accepted. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten.isEmpty) + } + + test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when " + + "isAllFreeResources = true") { + val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 + + // Advancing clock increases locality level to NODE_LOCAL. + clock.advance(advanceAmount) + + // If there are no rejects on an all resource offer, delay scheduling is reset. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = true) + .flatten.length === 1) + + // This NODE_LOCAL task should not be accepted. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten.isEmpty) + } + + test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " + + "time if last full resource offer (isAllResources = true) was accepted as well as any " + + "following partial resource offers") { + val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 + + // PROCESS_LOCAL full resource offer is accepted. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = true) + .flatten.length === 1) + + // Advancing clock increases locality level to NODE_LOCAL. + clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted. + // Since all offers have been accepted since the last full resource offer + // (this one and the previous one), delay scheduling is reset. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // Advancing clock increases locality level to NODE_LOCAL + clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted + // Since all offers have been accepted since the last full resource offer + // (one previous full offer, one previous partial offer, and this partial offer), + // delay scheduling is reset. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // This NODE_LOCAL task should not be accepted. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten.isEmpty) + } + + // This tests two cases + // 1. partial resource offer doesn't reset timer after full resource offer had rejected resources + // 2. partial resource offer doesn't reset timer after partial resource offer + // had rejected resources + test("SPARK-18886 - partial resource offers (isAllFreeResources = false) do not reset " + + "time if any offer was rejected since last full offer was fully accepted") { + val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) + // Locality levels increase at 3000 ms. + val advanceAmount = 3000 + + // case 1 from test description above. + // NODE_LOCAL full resource offer is rejected, so delay scheduling is not reset. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = true) + .flatten.isEmpty) + + // Advancing clock increases locality level to NODE_LOCAL + clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted, + // but because preceding full resource offer was rejected, delay scheduling is not reset. + // Locality level remains at NODE_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // Even though we launched a local task above, we still utilize non-local exec2. + // This is the behavior change to fix SPARK-18886. + // Locality level remains NODE_LOCAL after this clock advance. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + + // case 2 from test description above. + // PROCESS_LOCAL full resource offer is accepted, resetting delay scheduling. + // This line resets the timer and locality level is reset to PROCESS_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = true) + .flatten.length === 1) + + // Partial resource offer: NODE_LOCAL exec 2 is rejected, PROCESS_LOCAL exec1 is accepted. + // Since there were rejects, delay scheduling is not reset, and follow up partial offers + // will not reset delay scheduling, even if they are accepted. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1), WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.size === 1) + + // Advancing clock increases locality level to NODE_LOCAL + clock.advance(advanceAmount) + + // PROCESS_LOCAL partial resource is accepted, but does not reset delay scheduling + // as described above. + // Locality level remains at NODE_LOCAL. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec1", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + + // NODE_LOCAL partial resource offer is accepted, + // verifying locality level was not reset to PROCESS_LOCAL by above offer. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten.length === 1) + } + test("Scheduler does not crash when tasks are not serializable") { val taskCpus = 2 val taskScheduler = setupSchedulerWithMaster( @@ -901,18 +1135,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } // Here is the main check of this test -- we have the same offers again, and we schedule it - // successfully. Because the scheduler first tries to schedule with locality in mind, at first - // it won't schedule anything on executor1. But despite that, we don't abort the job. Then the - // scheduler tries for ANY locality, and successfully schedules tasks on executor1. + // successfully. Because the scheduler tries to schedule with locality in mind, at first + // it won't schedule anything on executor1. But despite that, we don't abort the job. val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten - assert(secondTaskAttempts.size == 2) - secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) } + assert(secondTaskAttempts.isEmpty) assert(!failedTaskSet) } test("SPARK-16106 locality levels updated if executor added to existing host") { val taskScheduler = setupScheduler() + taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0, (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* )) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4566e3ca7b3ee..4978be3e04c1e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -225,7 +225,7 @@ class TaskSetManagerSuite // Offer a host with NO_PREF as the constraint, // we should get a nopref task immediately since that's what we only have - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) clock.advance(1) @@ -246,7 +246,7 @@ class TaskSetManagerSuite // First three offers should all find tasks for (i <- 0 until 3) { - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === "exec1") @@ -254,7 +254,7 @@ class TaskSetManagerSuite assert(sched.startedTasks.toSet === Set(0, 1, 2)) // Re-offer the host -- now we should get no more tasks - assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None) + assert(manager.resourceOffer("exec1", "host1", NO_PREF)._1 === None) // Finish the first two tasks manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0))) @@ -277,12 +277,12 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // An executor that is not NODE_LOCAL should be rejected. - assert(manager.resourceOffer("execC", "host2", ANY) === None) + assert(manager.resourceOffer("execC", "host2", ANY)._1 === None) // Because there are no alive PROCESS_LOCAL executors, the base locality level should be // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before // any of the locality wait timers expire. - assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("execA", "host1", ANY)._1.get.index === 0) } test("basic delay scheduling") { @@ -297,22 +297,22 @@ class TaskSetManagerSuite val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1, exec1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 === None) clock.advance(LOCALITY_WAIT_MS) // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should // get chosen before the noPref task - assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL)._1.get.index == 2) // Offer host2, exec2, at NODE_LOCAL level: we should choose task 2 - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1.get.index == 1) // Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task // after failing to find a node_Local task - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1 === None) clock.advance(LOCALITY_WAIT_MS) - assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3) + assert(manager.resourceOffer("exec2", "host2", NO_PREF)._1.get.index == 3) } test("we do not need to delay scheduling when we only have noPref tasks in the queue") { @@ -326,10 +326,10 @@ class TaskSetManagerSuite val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1, exec1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0) - assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1) - assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None) - assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("exec3", "host2", NO_PREF)._1.get.index === 2) } test("delay scheduling with fallback") { @@ -343,33 +343,55 @@ class TaskSetManagerSuite Seq(TaskLocation("host3")), Seq(TaskLocation("host2")) ) + sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true) val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) // Offer host1 again: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) clock.advance(LOCALITY_WAIT_MS) // Offer host1 again: second task (on host2) should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 1) // Offer host1 again: third task (on host2) should get chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 2) // Offer host2: fifth task (also on host2) should get chosen - assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 4) // Now that we've launched a local task, we should no longer launch the task for host3 - assert(manager.resourceOffer("exec2", "host2", ANY) === None) + assert(manager.resourceOffer("exec2", "host2", ANY)._1 === None) clock.advance(LOCALITY_WAIT_MS) + // offers not accepted due to task set zombies are not delay schedule rejects + manager.isZombie = true + val (taskDesciption, delayReject) = manager.resourceOffer("exec2", "host2", ANY) + assert(taskDesciption.isEmpty) + assert(delayReject === false) + manager.isZombie = false + + // offers not accepted due to blacklisting are not delay schedule rejects + val tsmSpy = spy(manager) + val blacklist = mock(classOf[TaskSetBlacklist]) + when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(blacklist)) + when(blacklist.isNodeBlacklistedForTaskSet(any())).thenReturn(true) + val (blacklistTask, blackListReject) = tsmSpy.resourceOffer("exec2", "host2", ANY) + assert(blacklistTask.isEmpty) + assert(blackListReject === false) + // After another delay, we can go ahead and launch that task non-locally - assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 3) + + // offers not accepted due to no pending tasks are not delay schedule rejects + val (noPendingTask, noPendingReject) = manager.resourceOffer("exec2", "host2", ANY) + assert(noPendingTask.isEmpty) + assert(noPendingReject === false) } test("delay scheduling with failed hosts") { @@ -385,28 +407,28 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // First offer host1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) // After this, nothing should get chosen, because we have separated tasks with unavailable // preference from the noPrefPendingTasks - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) // Now mark host2 as dead sched.removeExecutor("exec2") manager.executorLost("exec2", "host2", SlaveLost()) // nothing should be chosen - assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) clock.advance(LOCALITY_WAIT_MS * 2) // task 1 and 2 would be scheduled as nonLocal task - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 2) // all finished - assert(manager.resourceOffer("exec1", "host1", ANY) === None) - assert(manager.resourceOffer("exec2", "host2", ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None) + assert(manager.resourceOffer("exec2", "host2", ANY)._1 === None) } test("task result lost") { @@ -417,14 +439,14 @@ class TaskSetManagerSuite clock.advance(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) // Tell it the task has finished but the result was lost. manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost) assert(sched.endedTasks(0) === TaskResultLost) // Re-offer the host -- now we should get task 0 again. - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) } test("repeated failures lead to task set abortion") { @@ -438,7 +460,7 @@ class TaskSetManagerSuite // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // after the last failure. (1 to manager.maxTaskFailures).foreach { index => - val offerResult = manager.resourceOffer("exec1", "host1", ANY) + val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1 assert(offerResult.isDefined, "Expect resource offer on iteration %s to return a task".format(index)) assert(offerResult.get.index === 0) @@ -474,7 +496,7 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) { - val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -485,15 +507,15 @@ class TaskSetManagerSuite assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1 fails after failure 1 due to blacklist - assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } // Run the task on exec1.1 - should work, and then fail it on exec1.1 { - val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL) + val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)._1 assert(offerResult.isDefined, "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) @@ -505,12 +527,12 @@ class TaskSetManagerSuite assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist - assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)._1.isEmpty) } // Run the task on exec2 - should work, and then fail it on exec2 { - val offerResult = manager.resourceOffer("exec2", "host2", ANY) + val offerResult = manager.resourceOffer("exec2", "host2", ANY)._1 assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -521,7 +543,7 @@ class TaskSetManagerSuite assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec2 fails after failure 3 due to blacklist - assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) } // Despite advancing beyond the time for expiring executors from within the blacklist, @@ -529,17 +551,17 @@ class TaskSetManagerSuite clock.advance(rescheduleDelay) { - val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 assert(offerResult.isEmpty) } { - val offerResult = manager.resourceOffer("exec3", "host3", ANY) + val offerResult = manager.resourceOffer("exec3", "host3", ANY)._1 assert(offerResult.isDefined) assert(offerResult.get.index === 0) assert(offerResult.get.executorId === "exec3") - assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) // Cause exec3 to fail : failure 4 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) @@ -598,14 +620,14 @@ class TaskSetManagerSuite manager.executorAdded() sched.addExecutor("execC", "host2") manager.executorAdded() - assert(manager.resourceOffer("exec1", "host1", ANY).isDefined) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isDefined) sched.removeExecutor("execA") manager.executorLost( "execA", "host1", ExecutorExited(143, false, "Terminated for reason unrelated to running tasks")) assert(!sched.taskSetsFailed.contains(taskSet.id)) - assert(manager.resourceOffer("execC", "host2", ANY).isDefined) + assert(manager.resourceOffer("execC", "host2", ANY)._1.isDefined) sched.removeExecutor("execC") manager.executorLost( "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks")) @@ -633,12 +655,12 @@ class TaskSetManagerSuite clock.advance(LOCALITY_WAIT_MS * 3) // Offer host3 // No task is scheduled if we restrict locality to RACK_LOCAL - assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None) + assert(manager.resourceOffer("execC", "host3", RACK_LOCAL)._1 === None) // Task 0 can be scheduled with ANY - assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0) + assert(manager.resourceOffer("execC", "host3", ANY)._1.get.index === 0) // Offer host2 // Task 1 can be scheduled with RACK_LOCAL - assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) + assert(manager.resourceOffer("execB", "host2", RACK_LOCAL)._1.get.index === 1) } test("do not emit warning when serialized task is small") { @@ -649,7 +671,7 @@ class TaskSetManagerSuite assert(!manager.emittedTaskSizeWarning) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) assert(!manager.emittedTaskSizeWarning) } @@ -664,7 +686,7 @@ class TaskSetManagerSuite assert(!manager.emittedTaskSizeWarning) - assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0) assert(manager.emittedTaskSizeWarning) } @@ -752,13 +774,13 @@ class TaskSetManagerSuite // Offer host1, which should be accepted as a PROCESS_LOCAL location // by the one task in the task set - val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get + val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL)._1.get // Mark the task as available for speculation, and then offer another resource, // which should be used to launch a speculative copy of the task. manager.speculatableTasks += singleTask.partitionId manager.addPendingTask(singleTask.partitionId, speculatable = true) - val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get + val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY)._1.get assert(manager.runningTasks === 2) assert(manager.isZombie === false) @@ -844,7 +866,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec3" -> "host3", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === exec) @@ -870,7 +892,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(2, 3)) // Offer resource to start the speculative attempt for the running task 2.0 - val taskOption = manager.resourceOffer("exec2", "host2", ANY) + val taskOption = manager.resourceOffer("exec2", "host2", ANY)._1 assert(taskOption.isDefined) val task4 = taskOption.get assert(task4.index === 2) @@ -899,20 +921,20 @@ class TaskSetManagerSuite val clock = new ManualClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) - assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index == 1) manager.speculatableTasks += 1 manager.addPendingTask(1, speculatable = true) clock.advance(LOCALITY_WAIT_MS) // schedule the nonPref task - assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) + assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index === 2) // schedule the speculative task - assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1) + assert(manager.resourceOffer("execB", "host2", NO_PREF)._1.get.index === 1) clock.advance(LOCALITY_WAIT_MS * 3) // schedule non-local tasks - assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) + assert(manager.resourceOffer("execB", "host2", ANY)._1.get.index === 3) } test("node-local tasks should be scheduled right away " + @@ -929,13 +951,13 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // node-local tasks are scheduled without delay - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) - assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1) - assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3) - assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("execA", "host2", NODE_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("execA", "host3", NODE_LOCAL)._1.get.index === 3) + assert(manager.resourceOffer("execA", "host3", NODE_LOCAL)._1 === None) // schedule no-preference after node local ones - assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2) + assert(manager.resourceOffer("execA", "host3", NO_PREF)._1.get.index === 2) } test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") @@ -951,13 +973,13 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // process-local tasks are scheduled first - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2) - assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 2) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1.get.index === 3) // node-local tasks are scheduled without delay - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) - assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1) - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) - assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 0) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1 === None) } test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { @@ -971,13 +993,13 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) // process-local tasks are scheduled first - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1) - assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2) + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1.get.index === 1) + assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL)._1.get.index === 2) // no-pref tasks are scheduled without delay - assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None) - assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) - assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0) - assert(manager.resourceOffer("execA", "host1", ANY) == None) + assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1 === None) + assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None) + assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index === 0) + assert(manager.resourceOffer("execA", "host1", ANY)._1 === None) } test("Ensure TaskSetManager is usable after addition of levels") { @@ -1061,7 +1083,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) + val taskOption = manager.resourceOffer(k, v, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) @@ -1082,7 +1104,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(3)) // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption5.isDefined) val task5 = taskOption5.get assert(task5.index === 3) @@ -1121,7 +1143,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) + val taskOption = manager.resourceOffer(k, v, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) @@ -1154,7 +1176,7 @@ class TaskSetManagerSuite manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason) sched.endedTasks(task.taskId) = endReason assert(!manager.isZombie) - val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF) + val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF)._1 assert(nextTask.isDefined, s"no offer for attempt $attempt of $index") tasks += nextTask.get } @@ -1170,7 +1192,7 @@ class TaskSetManagerSuite assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(3, 4)) // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption5.isDefined) val speculativeTask = taskOption5.get assert(speculativeTask.index === 3 || speculativeTask.index === 4) @@ -1195,7 +1217,7 @@ class TaskSetManagerSuite assert(!manager.isZombie) // now run another speculative task - val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOpt6.isDefined) val speculativeTask2 = taskOpt6.get assert(speculativeTask2.index === 3 || speculativeTask2.index === 4) @@ -1226,7 +1248,7 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1)) when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any(), any())).thenAnswer( (invocationOnMock: InvocationOnMock) => assert(manager.isZombie)) - val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption.isDefined) // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon manager.handleSuccessfulTask(0, createTaskResult(0)) @@ -1271,7 +1293,7 @@ class TaskSetManagerSuite "exec2" -> "host1" ).flatMap { case (exec, host) => // offer each executor twice (simulating 2 cores per executor) - (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)} + (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)._1} } assert(taskDescs.size === 4) @@ -1308,7 +1330,7 @@ class TaskSetManagerSuite "exec2" -> "host2" ).flatMap { case (exec, host) => // offer each executor twice (simulating 2 cores per executor) - (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)._1} } assert(taskDescs.size === 4) @@ -1344,7 +1366,7 @@ class TaskSetManagerSuite val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker)) val taskSetManagerSpy = spy(taskSetManager) - val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY) + val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)._1 // Assert the task has been black listed on the executor it was last executed on. when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer( @@ -1372,9 +1394,9 @@ class TaskSetManagerSuite val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock = new ManualClock) // all tasks from the first taskset have the same jars - val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF) + val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption1.get.addedJars === addedJarsPreTaskSet) - val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF) + val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption2.get.addedJars === addedJarsPreTaskSet) // even with a jar added mid-TaskSet @@ -1382,7 +1404,7 @@ class TaskSetManagerSuite sc.addJar(jarPath.toString) val addedJarsMidTaskSet = Map[String, Long](sc.addedJars.toSeq: _*) assert(addedJarsPreTaskSet !== addedJarsMidTaskSet) - val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF) + val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1 // which should have the old version of the jars list assert(taskOption3.get.addedJars === addedJarsPreTaskSet) @@ -1390,7 +1412,7 @@ class TaskSetManagerSuite val taskSet2 = FakeTask.createTaskSet(1) val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock) - val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF) + val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } @@ -1488,7 +1510,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec3" -> "host3", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === exec) @@ -1514,7 +1536,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(2, 3)) // Offer resource to start the speculative attempt for the running task 2.0 - val taskOption = manager.resourceOffer("exec2", "host2", ANY) + val taskOption = manager.resourceOffer("exec2", "host2", ANY)._1 assert(taskOption.isDefined) val task4 = taskOption.get assert(task4.index === 2) @@ -1560,7 +1582,7 @@ class TaskSetManagerSuite "exec1" -> "host1", "exec2" -> "host2", "exec2" -> "host2")) { - val taskOption = manager.resourceOffer(k, v, NO_PREF) + val taskOption = manager.resourceOffer(k, v, NO_PREF)._1 assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === k) @@ -1580,7 +1602,7 @@ class TaskSetManagerSuite assert(sched.speculativeTasks.toSet === Set(3)) // Offer resource to start the speculative attempt for the running task - val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) + val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 assert(taskOption5.isDefined) val task5 = taskOption5.get assert(task5.index === 3) @@ -1640,7 +1662,7 @@ class TaskSetManagerSuite assert(FakeRackUtil.numBatchInvocation === 1) assert(FakeRackUtil.numSingleHostInvocation === 0) // with rack locality, reject an offer on a host with an unknown rack - assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL).isEmpty) + assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL)._1.isEmpty) (0 until 20).foreach { rackIdx => (0 until 5).foreach { offerIdx => // if we offer hosts which are not in preferred locations, @@ -1648,9 +1670,9 @@ class TaskSetManagerSuite // but accept them at RACK_LOCAL level if they're on OK racks val hostIdx = 100 + rackIdx assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.NODE_LOCAL) - .isEmpty) + ._1.isEmpty) assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.RACK_LOCAL) - .isDefined) + ._1.isDefined) } } // check no more expensive calls to the rack resolution. manager.resourceOffer() will call @@ -1670,7 +1692,7 @@ class TaskSetManagerSuite val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))) val taskOption = - manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments) + manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments)._1 assert(taskOption.isDefined) val allocatedResources = taskOption.get.resources assert(allocatedResources.size == 1) @@ -1693,7 +1715,7 @@ class TaskSetManagerSuite // Offer resources for 4 tasks to start, 2 on each exec Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => (0 until 2).foreach { _ => - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) assert(taskOption.get.executorId === exec) } @@ -1717,8 +1739,8 @@ class TaskSetManagerSuite // Offer resource to start the speculative attempt for the running task. We offer more // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra // copy per speculatable task - val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) - val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF) + val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF)._1 + val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1 assert(taskOption2.isDefined) val task2 = taskOption2.get // Ensure that task index 3 is launched on host1 and task index 4 on host2 @@ -1738,9 +1760,9 @@ class TaskSetManagerSuite assert(manager.copiesRunning(1) === 2) assert(manager.copiesRunning(3) === 2) // Offering additional resources should not lead to any speculative tasks being respawned - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) - assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) - assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) + assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty) + assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty) } test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") { @@ -1763,7 +1785,7 @@ class TaskSetManagerSuite } // Offer resources for 3 tasks to start Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => - val taskOption = manager.resourceOffer(exec, host, NO_PREF) + val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1 assert(taskOption.isDefined) assert(taskOption.get.executorId === exec) } @@ -1776,17 +1798,17 @@ class TaskSetManagerSuite assert(manager.checkSpeculatableTasks(0)) assert(sched.speculativeTasks.toSet === Set(0, 1)) // Ensure that the speculatable tasks obey the original locality preferences - assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL)._1.isEmpty) // task 1 does have a node-local preference for host2 -- but we've already got a regular // task running there, so we should not schedule a speculative there as well. - assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) - assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) + assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1.isEmpty) + assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL)._1.isDefined) + assert(manager.resourceOffer("exec4", "host4", ANY)._1.isDefined) // Since, all speculatable tasks have been launched, making another offer // should not schedule any more tasks - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) assert(!manager.checkSpeculatableTasks(0)) - assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty) } private def testSpeculationDurationSetup( @@ -1931,7 +1953,7 @@ class TaskSetManagerSuite val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) assert(sched.taskSetsFailed.isEmpty) - val offerResult = manager.resourceOffer("exec1", "host1", ANY) + val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1 assert(offerResult.isDefined, "Expect resource offer on iteration 0 to return a task") assert(offerResult.get.index === 0) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index a5bd34888a0a6..36ed84858dbfb 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -303,7 +303,8 @@ class MesosFineGrainedSchedulerBackendSuite mesosOffers2.add(createOffer(1, minMem, minCpu)) reset(taskScheduler) reset(driver) - when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq())) + when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]), any[Boolean])) + .thenReturn(Seq(Seq())) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))