diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 2951bdc18bc57..c6aecae7fac8d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -223,4 +223,9 @@ package object config { " bigger files.") .longConf .createWithDefault(4 * 1024 * 1024) + + private[spark] val SPARK_SCHEDULER_TASK_ASSIGNER = ConfigBuilder("spark.scheduler.taskAssigner") + .doc("The task assigner (roundrobin, packed, balanced) to schedule tasks on workers.") + .stringConf + .createWithDefault("roundrobin") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala new file mode 100644 index 0000000000000..61d4d31b6b7ca --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.PriorityQueue +import scala.util.Random + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.util.Utils + +/** Tracks the current state of the workers with available cores and assigned task list. */ +private[scheduler] class OfferState(val workOffer: WorkerOffer) { + /** The current remaining cores that can be allocated to tasks. */ + var coresAvailable: Int = workOffer.cores + /** The list of tasks that are assigned to this WorkerOffer. */ + val tasks = new ArrayBuffer[TaskDescription](coresAvailable) + + def assignTask(task: TaskDescription, cpu: Int): Unit = { + if (coresAvailable < cpu) { + throw new SparkException(s"Available cores are less than cpu per task" + + s" ($coresAvailable < $cpu)") + } + tasks += task + coresAvailable -= cpu + } +} + +/** + * TaskAssigner is the base class for all task assigner implementations, and can be + * extended to implement different task scheduling algorithms. + * Together with [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]], TaskAssigner + * is used to assign tasks to workers with available cores. Internally, when TaskScheduler + * performs task assignment given available workers, it first sorts the candidate tasksets, + * and then for each taskset, it takes multiple rounds to request TaskAssigner for task + * assignment with different locality restrictions until there is either no qualified + * workers or no valid tasks to be assigned. + * + * TaskAssigner is responsible to maintain the worker availability state and task assignment + * information. The contract between [[org.apache.spark.scheduler.TaskScheduler TaskScheduler]] + * and TaskAssigner is as follows. + * + * First, TaskScheduler invokes construct() of TaskAssigner to initialize the its internal + * worker states at the beginning of resource offering. + * + * Second, before each round of task assignment for a taskset, TaskScheduler invokes the init() + * of TaskAssigner to initialize the data structure for the round. + * + * Third, when performing real task assignment, hasNext/next() is used by TaskScheduler + * to check the worker availability and retrieve current offering from TaskAssigner. + * + * Fourth, TaskScheduler calls offerAccepted() to notify the TaskAssigner so that + * TaskAssigner can decide whether the current offer is valid or not for the next request. + * + * Fifth, after task assignment is done, TaskScheduler invokes the function tasks to + * retrieve all the task assignment information. + */ + +private[scheduler] sealed abstract class TaskAssigner { + protected var offer: Seq[OfferState] = _ + protected var cpuPerTask = 1 + + protected def withCpuPerTask(cpuPerTask: Int): TaskAssigner = { + this.cpuPerTask = cpuPerTask + this + } + + /** The currently assigned offers. */ + final def tasks: Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks) + + /** + * Invoked at the beginning of resource offering to construct the offer with the workoffers. + * By default, offers is randomly shuffled to avoid always placing tasks on the same set of + * workers. + */ + def construct(workOffer: Seq[WorkerOffer]): Unit = { + offer = Random.shuffle(workOffer.map(o => new OfferState(o))) + } + + /** Invoked at each round of Taskset assignment to initialize the internal structure. */ + def init(): Unit + + /** + * Tests whether there is offer available to be used inside of one round of Taskset assignment. + * @return `true` if a subsequent call to `next` will yield an element, + * `false` otherwise. + */ + def hasNext: Boolean + + /** + * Produces next worker offer based on the task assignment strategy. + * @return the next available offer, if `hasNext` is `true`, + * undefined behavior otherwise. + */ + def next(): OfferState + + /** + * Invoked by the TaskScheduler to indicate whether the current offer is accepted or not so that + * the assigner can decide whether the current worker is valid for the next offering. + * + * @param isAccepted whether TaskScheduler assigns a task to current offer. + */ + def offerAccepted(isAccepted: Boolean): Unit +} + +object TaskAssigner extends Logging { + private val roundrobin = classOf[RoundRobinAssigner].getCanonicalName + private val packed = classOf[PackedAssigner].getCanonicalName + private val balanced = classOf[BalancedAssigner].getCanonicalName + private val assignerMap: Map[String, String] = + Map("roundrobin" -> roundrobin, + "packed" -> packed, + "balanced" -> balanced) + + def init(conf: SparkConf): TaskAssigner = { + val assignerName = conf.get(config.SPARK_SCHEDULER_TASK_ASSIGNER.key, "roundrobin") + val className = { + val name = assignerMap.get(assignerName.toLowerCase()) + name.getOrElse { + throw new SparkException(s"Task Assigner $assignerName is invalid. Available assigners " + + s"are roundrobin, packed, and balanced. roundrobin is the default") + + } + } + // The className is valid. No need to catch exceptions. + logInfo(s"Constructing TaskAssigner as $className") + Utils.classForName(className).getConstructor().newInstance().asInstanceOf[TaskAssigner] + .withCpuPerTask(cpuPerTask = conf.getInt("spark.task.cpus", 1)) + } +} + +/** + * Assigns the task to workers with available cores in a roundrobin manner. + */ +class RoundRobinAssigner extends TaskAssigner { + private var currentOfferIndex = 0 + + override def init(): Unit = { + currentOfferIndex = 0 + } + + override def hasNext: Boolean = currentOfferIndex < offer.size + + override def next(): OfferState = { + offer(currentOfferIndex) + } + + override def offerAccepted(isAccepted: Boolean): Unit = { + currentOfferIndex += 1 + } +} + +/** + * Assigns the task to workers with the most available cores. In other words, BalancedAssigner tries + * to distribute the task across workers in a balanced way. Potentially, it may alleviate the + * workers' memory pressure as less tasks running on the same workers, which also indicates that + * the task itself can make use of more computation resources, e.g., hyper-thread, across clusters. + */ +class BalancedAssigner extends TaskAssigner { + implicit val ord: Ordering[OfferState] = new Ordering[OfferState] { + def compare(x: OfferState, y: OfferState): Int = { + return Ordering[Int].compare(x.coresAvailable, y.coresAvailable) + } + } + private val maxHeap: PriorityQueue[OfferState] = new PriorityQueue[OfferState]() + private var currentOffer: OfferState = _ + + override def init(): Unit = { + maxHeap.clear() + offer.filter(_.coresAvailable >= cpuPerTask).foreach(maxHeap.enqueue(_)) + } + + override def hasNext: Boolean = maxHeap.nonEmpty + + override def next(): OfferState = { + currentOffer = maxHeap.dequeue() + currentOffer + } + + override def offerAccepted(isAccepted: Boolean): Unit = { + if (currentOffer.coresAvailable >= cpuPerTask && isAccepted) { + maxHeap.enqueue(currentOffer) + } + } +} + +/** + * Assigns the task to workers with the least available cores. In other words, PackedAssigner tries + * to schedule tasks to fewer workers. As a result, there will be idle workers without any tasks + * assigned if more than required workers are reserved. If the dynamic allocator is enabled, + * these idle workers will be released by driver. The released resources can then be allocated to + * other jobs by underling resource manager. This assigner can potentially reduce the resource + * reservation for jobs, which over allocate resources than they need. + */ +class PackedAssigner extends TaskAssigner { + private var sortedOffer: Seq[OfferState] = _ + private var currentOfferIndex = 0 + private var currentOffer: OfferState = _ + + override def init(): Unit = { + currentOfferIndex = 0 + sortedOffer = offer.filter(_.coresAvailable >= cpuPerTask).sortBy(_.coresAvailable) + } + + override def hasNext: Boolean = currentOfferIndex < sortedOffer.size + + override def next(): OfferState = { + currentOffer = sortedOffer(currentOfferIndex) + currentOffer + } + + override def offerAccepted(isAccepted: Boolean): Unit = { + if (currentOffer.coresAvailable < cpuPerTask || !isAccepted) { + currentOfferIndex += 1 + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 3e3f1ad031e66..44b981a3e7de9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -22,9 +22,7 @@ import java.util.{Timer, TimerTask} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong -import scala.collection.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState @@ -60,7 +58,7 @@ private[spark] class TaskSchedulerImpl( def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) val conf = sc.conf - + private val taskAssigner: TaskAssigner = TaskAssigner.init(conf) // How often to check for speculative tasks val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") @@ -96,6 +94,9 @@ private[spark] class TaskSchedulerImpl( // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] + // For testing to verify the right TaskAssigner is picked up. + def getTaskAssigner(): TaskAssigner = taskAssigner + def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap // The set of executors we have on each host; this is used to compute hostsAlive, which @@ -250,24 +251,24 @@ private[spark] class TaskSchedulerImpl( private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, - shuffledOffers: Seq[WorkerOffer], - availableCpus: Array[Int], - tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { + taskAssigner: TaskAssigner) : Boolean = { var launchedTask = false - for (i <- 0 until shuffledOffers.size) { - val execId = shuffledOffers(i).executorId - val host = shuffledOffers(i).host - if (availableCpus(i) >= CPUS_PER_TASK) { + taskAssigner.init() + while (taskAssigner.hasNext) { + var isAccepted = false + val currentOffer = taskAssigner.next() + val execId = currentOffer.workOffer.executorId + val host = currentOffer.workOffer.host + if (currentOffer.coresAvailable >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { - tasks(i) += task + currentOffer.assignTask(task, CPUS_PER_TASK) val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 - availableCpus(i) -= CPUS_PER_TASK - assert(availableCpus(i) >= 0) launchedTask = true + isAccepted = true } } catch { case e: TaskNotSerializableException => @@ -277,14 +278,15 @@ private[spark] class TaskSchedulerImpl( return launchedTask } } + taskAssigner.offerAccepted(isAccepted) } return launchedTask } /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task - * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so - * that tasks are balanced across the cluster. + * sets for tasks in order of priority. We fill each node with tasks in a roundrobin, packed or + * balanced way based on the configured TaskAssigner. */ def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname @@ -305,12 +307,8 @@ private[spark] class TaskSchedulerImpl( hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } + taskAssigner.construct(offers) - // Randomly shuffle offers to avoid always placing tasks on the same set of workers. - val shuffledOffers = Random.shuffle(offers) - // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) - val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( @@ -329,7 +327,7 @@ private[spark] class TaskSchedulerImpl( for (currentMaxLocality <- taskSet.myLocalityLevels) { do { launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( - taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) + taskSet, currentMaxLocality, taskAssigner) launchedAnyTask |= launchedTaskAtCurrentMaxLocality } while (launchedTaskAtCurrentMaxLocality) } @@ -337,10 +335,11 @@ private[spark] class TaskSchedulerImpl( taskSet.abortIfCompletelyBlacklisted(hostToExecutors) } } - + val tasks = taskAssigner.tasks if (tasks.size > 0) { hasLaunchedTask = true } + return tasks } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index f5f1947661d9a..7ea07630c92e8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -85,8 +85,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler } - test("Scheduler does not always schedule tasks on the same workers") { - val taskScheduler = setupScheduler() + private def roundrobin(taskScheduler: TaskSchedulerImpl): Unit = { + assert(taskScheduler.getTaskAssigner().isInstanceOf[RoundRobinAssigner]) val numFreeCores = 1 val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores), new WorkerOffer("executor1", "host1", numFreeCores)) @@ -96,6 +96,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // probability of that happening is 2^-1000 (so sufficiently small to be considered // negligible). val numTrials = 1000 + Seq(1, 2, 3).toIterator.next() val selectedExecutorIds = 1.to(numTrials).map { _ => val taskSet = FakeTask.createTaskSet(1) taskScheduler.submitTasks(taskSet) @@ -109,6 +110,88 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!failedTaskSet) } + test("Scheduler does not always schedule tasks on the same workers") { + val taskScheduler = setupScheduler() + roundrobin(taskScheduler) + } + + test("Roundrobin - User can specify the roundrobin task assigner") { + val taskScheduler = setupScheduler(("spark.scheduler.taskAssigner", "RoUndrObin")) + roundrobin(taskScheduler) + } + + test("Invalid - Throws SparkException") { + intercept[SparkException] { setupScheduler("spark.scheduler.taskAssigner" -> "invalid") } + } + + test("Balanced - Assigner balances the tasks to the worker with more free cores") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "BaLanceD") + assert(taskScheduler.getTaskAssigner().isInstanceOf[BalancedAssigner]) + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), + new WorkerOffer("executor1", "host1", 4)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(2) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(2 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(1).executorId) + assert(count == 2) + assert(!failedTaskSet) + } + + test("Balanced - Assigner balances the tasks across workers with same free cores") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "balanced") + assert(taskScheduler.getTaskAssigner().isInstanceOf[BalancedAssigner]) + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), + new WorkerOffer("executor1", "host1", 2)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(2) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(2 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(1).executorId) + assert(count == 1) + assert(!failedTaskSet) + } + + test("Packed - Assigner packs the tasks to workers with less free cores") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "paCkeD") + assert(taskScheduler.getTaskAssigner().isInstanceOf[PackedAssigner]) + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2), + new WorkerOffer("executor1", "host1", 4)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(2) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(2 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) + assert(count == 2) + assert(!failedTaskSet) + } + + test("Packed - Assigner keeps packing the assignment to the same worker") { + val taskScheduler = setupScheduler("spark.scheduler.taskAssigner" -> "packed") + assert(taskScheduler.getTaskAssigner().isInstanceOf[PackedAssigner]) + val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4), + new WorkerOffer("executor1", "host1", 4)) + val selectedExecutorIds = { + val taskSet = FakeTask.createTaskSet(4) + taskScheduler.submitTasks(taskSet) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 === taskDescriptions.length) + taskDescriptions.map(_.executorId) + } + val count = selectedExecutorIds.count(_ == workerOffers(0).executorId) + assert(count == 4 || count == 0) + assert(!failedTaskSet) + } + test("Scheduler correctly accounts for multiple CPUs per task") { val taskCpus = 2 val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) diff --git a/docs/configuration.md b/docs/configuration.md index a3b4ff01e6d92..04f0ae007bc4c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1374,6 +1374,21 @@ Apart from these, the following properties are also available, and may be useful Should be greater than or equal to 1. Number of allowed retries = this value - 1. + + spark.scheduler.taskAssigner + roundrobin + + The strategy of how to allocate tasks among workers with free cores. Three task + assigners (roundrobin, packed, and balanced) are supported currently. By default, the + "roundrobin" task assigner is used to allocate tasks to workers with available cores in a + roundrobin manner with randomness. The "packed" task assigner is used to allocate tasks to + workers with the least free cores, resulting in tasks assigned to fewer workers, which may + help driver to release the reserved idle workers when dynamic + allocation(spark.dynamicAllocation.enabled) is enabled. The "balanced" task assigner is used + to assign tasks across workers in a balanced way (allocating tasks to workers with the most + free cores). + + #### Dynamic Allocation