diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 4d475fa8a6e8..bdc9e9c6106c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -20,8 +20,8 @@ package org.apache.spark.deploy.master import scala.collection.mutable import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String]) @@ -29,12 +29,20 @@ private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String override protected def resourceName = this.name override protected def resourceAddresses = this.addresses - override protected def slotsPerAddress: Int = 1 + /** + * Acquire the resources. + * @param amount How many addresses are requesting. + * @return ResourceInformation + */ def acquire(amount: Int): ResourceInformation = { - val allocated = availableAddrs.take(amount) - acquire(allocated) - new ResourceInformation(resourceName, allocated.toArray) + // Any available address from availableAddrs must be a whole resource + // since worker needs to do full resources to the executors. + val addresses = availableAddrs.take(amount) + assert(addresses.length == amount) + + acquire(addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap) + new ResourceInformation(resourceName, addresses.toArray) } } @@ -163,7 +171,7 @@ private[spark] class WorkerInfo( */ def recoverResources(expected: Map[String, ResourceInformation]): Unit = { expected.foreach { case (rName, rInfo) => - resources(rName).acquire(rInfo.addresses.toImmutableArraySeq) + resources(rName).acquire(rInfo.addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap) } } @@ -173,7 +181,7 @@ private[spark] class WorkerInfo( */ private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = { allocated.foreach { case (rName, rInfo) => - resources(rName).release(rInfo.addresses.toImmutableArraySeq) + resources(rName).release(rInfo.addresses.map(addrs => addrs -> ONE_ENTIRE_RESOURCE).toMap) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 366b481bf6a4..b507d27f14c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -20,7 +20,6 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer import java.util.Locale -import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.util.{Failure, Success} @@ -65,16 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend( private var _resources = Map.empty[String, ResourceInformation] - /** - * Map each taskId to the information about the resource allocated to it, Please refer to - * [[ResourceInformation]] for specifics. - * CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227) - * Exposed for testing only. - */ - private[executor] val taskResources = new ConcurrentHashMap[ - Long, Map[String, ResourceInformation] - ] - private var decommissioned = false // Track the last time in ns that at least one task is running. If no task is running and all @@ -192,7 +181,6 @@ private[spark] class CoarseGrainedExecutorBackend( } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) - taskResources.put(taskDesc.taskId, taskDesc.resources) executor.launchTask(this, taskDesc) } @@ -272,11 +260,10 @@ private[spark] class CoarseGrainedExecutorBackend( } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = { - val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation]) + val resources = executor.runningTasks.get(taskId).taskDescription.resources val cpus = executor.runningTasks.get(taskId).taskDescription.cpus val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources) if (TaskState.isFinished(state)) { - taskResources.remove(taskId) lastTaskFinishTime.set(System.nanoTime()) } driver match { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f2a65aab1ba4..05ce4ae79b46 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -618,13 +618,17 @@ private[spark] class Executor( threadMXBean.getCurrentThreadCpuTime } else 0L var threwException = true + // Convert resources amounts info to ResourceInformation + val resources = taskDescription.resources.map { case (rName, addressesAmounts) => + rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray) + } val value = Utils.tryWithSafeFinally { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem, cpus = taskDescription.cpus, - resources = taskDescription.resources, + resources = resources, plugins = plugins) threwException = false res diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala index 7b97d9704282..e9bb11721725 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala> val taskAmount = 1.0 / 9 + * taskAmount: Double = 0.1111111111111111 + * + * scala> var total = 1.0 + * total: Double = 1.0 + * + * scala> for (i <- 1 to 9 ) { + * | if (total >= taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0.1111111111111111 for task 1, total left: 0.8888888888888888 + * assign 0.1111111111111111 for task 2, total left: 0.7777777777777777 + * assign 0.1111111111111111 for task 3, total left: 0.6666666666666665 + * assign 0.1111111111111111 for task 4, total left: 0.5555555555555554 + * assign 0.1111111111111111 for task 5, total left: 0.44444444444444425 + * assign 0.1111111111111111 for task 6, total left: 0.33333333333333315 + * assign 0.1111111111111111 for task 7, total left: 0.22222222222222204 + * assign 0.1111111111111111 for task 8, total left: 0.11111111111111094 + * ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L + + def isOneEntireResource(amount: Long): Boolean = amount == ONE_ENTIRE_RESOURCE + + def toInternalResource(amount: Double): Long = (amount * ONE_ENTIRE_RESOURCE).toLong + + def toFractionalResource(amount: Long): Double = amount.toDouble / ONE_ENTIRE_RESOURCE + +} /** * Trait used to help executor/worker allocate resources. @@ -29,59 +72,53 @@ private[spark] trait ResourceAllocator { protected def resourceName: String protected def resourceAddresses: Seq[String] - protected def slotsPerAddress: Int /** - * Map from an address to its availability, a value > 0 means the address is available, - * while value of 0 means the address is fully assigned. - * - * For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value - * can be a multiple, such that each address can be allocated up to [[slotsPerAddress]] - * times. + * Map from an address to its availability default to 1.0 (we multiply ONE_ENTIRE_RESOURCE + * to avoid precision error), a value > 0 means the address is available, while value of + * 0 means the address is fully assigned. */ private lazy val addressAvailabilityMap = { - mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*) + mutable.HashMap(resourceAddresses.map(address => address -> ONE_ENTIRE_RESOURCE): _*) } /** - * Sequence of currently available resource addresses. - * - * With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses - * e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like - * Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one. + * Get the amounts of resources that have been multiplied by ONE_ENTIRE_RESOURCE. + * @return the resources amounts + */ + def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap + + /** + * Sequence of currently available resource addresses which are not fully assigned. */ def availableAddrs: Seq[String] = addressAvailabilityMap - .flatMap { case (addr, available) => - (0 until available).map(_ => addr) - }.toSeq.sorted + .filter(addresses => addresses._2 > 0).keys.toSeq.sorted /** * Sequence of currently assigned resource addresses. - * - * With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses - * e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like - * Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice. */ private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap - .flatMap { case (addr, available) => - (0 until slotsPerAddress - available).map(_ => addr) - }.toSeq.sorted + .filter(addresses => addresses._2 < ONE_ENTIRE_RESOURCE).keys.toSeq.sorted /** * Acquire a sequence of resource addresses (to a launched task), these addresses must be * available. When the task finishes, it will return the acquired resource addresses. * Throw an Exception if an address is not available or doesn't exist. */ - def acquire(addrs: Seq[String]): Unit = { - addrs.foreach { address => - val isAvailable = addressAvailabilityMap.getOrElse(address, + def acquire(addressesAmounts: Map[String, Long]): Unit = { + addressesAmounts.foreach { case (address, amount) => + val prevAmount = addressAvailabilityMap.getOrElse(address, throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " + - s"address $address doesn't exist.")) - if (isAvailable > 0) { - addressAvailabilityMap(address) -= 1 + s"address $address doesn't exist.")) + + val left = prevAmount - amount + + if (left < 0) { + throw new SparkException(s"Try to acquire $resourceName address $address " + + s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}, but only " + + s"${ResourceAmountUtils.toFractionalResource(prevAmount)} left.") } else { - throw new SparkException("Try to acquire an address that is not available. " + - s"$resourceName address $address is not available.") + addressAvailabilityMap(address) = left } } } @@ -91,16 +128,21 @@ private[spark] trait ResourceAllocator { * addresses are released when a task has finished. * Throw an Exception if an address is not assigned or doesn't exist. */ - def release(addrs: Seq[String]): Unit = { - addrs.foreach { address => - val isAvailable = addressAvailabilityMap.getOrElse(address, + def release(addressesAmounts: Map[String, Long]): Unit = { + addressesAmounts.foreach { case (address, amount) => + val prevAmount = addressAvailabilityMap.getOrElse(address, throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " + s"address $address doesn't exist.")) - if (isAvailable < slotsPerAddress) { - addressAvailabilityMap(address) += 1 + + val total = prevAmount + amount + + if (total > ONE_ENTIRE_RESOURCE) { + throw new SparkException(s"Try to release $resourceName address $address " + + s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}. But the total amount: " + + s"${ResourceAmountUtils.toFractionalResource(total)} " + + s"after release should be <= 1") } else { - throw new SparkException(s"Try to release an address that is not assigned. $resourceName " + - s"address $address is not assigned.") + addressAvailabilityMap(address) = total } } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 69c0672562c2..4a55b2f619e6 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -49,6 +49,8 @@ class ResourceProfile( val executorResources: Map[String, ExecutorResourceRequest], val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging { + validate() + // _id is only a var for testing purposes private var _id = ResourceProfile.getNextProfileId // This is used for any resources that use fractional amounts, the key is the resource name @@ -59,6 +61,19 @@ class ResourceProfile( private var _maxTasksPerExecutor: Option[Int] = None private var _coresLimitKnown: Boolean = false + /** + * Validate the ResourceProfile + */ + protected def validate(): Unit = { + // The task.amount in ResourceProfile falls within the range of 0 to 0.5, + // or it's a whole number + for ((_, taskReq) <- taskResources) { + val taskAmount = taskReq.amount + assert(taskAmount <= 0.5 || taskAmount % 1 == 0, + s"The task resource amount ${taskAmount} must be either <= 0.5, or a whole number.") + } + } + /** * A unique id of this ResourceProfile */ @@ -105,12 +120,8 @@ class ResourceProfile( /* * This function takes into account fractional amounts for the task resource requirement. - * Spark only supports fractional amounts < 1 to basically allow for multiple tasks - * to use the same resource address. - * The way the scheduler handles this is it adds the same address the number of slots per - * address times and then the amount becomes 1. This way it re-uses the same address - * the correct number of times. ie task requirement amount=0.25 -> addrs["0", "0", "0", "0"] - * and scheduler task amount=1. See ResourceAllocator.slotsPerAddress. + * Spark only supports fractional amounts < 1 to basically allow for multiple tasks + * to use the same resource address or a whole number to use the multiple whole addresses. */ private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = { val taskAmount = taskResources.getOrElse(resource, @@ -280,6 +291,11 @@ private[spark] class TaskResourceProfile( override val taskResources: Map[String, TaskResourceRequest]) extends ResourceProfile(Map.empty, taskResources) { + // The task.amount in TaskResourceProfile falls within the range of 0 to 1.0, + // or it's a whole number, and it has been checked in the TaskResourceRequest. + // Therefore, we can safely skip this check. + override protected def validate(): Unit = {} + override protected[spark] def getCustomExecutorResources() : Map[String, ExecutorResourceRequest] = { if (SparkEnv.get == null) { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index a6f2ac35af7a..8718ce8ea083 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -169,18 +169,17 @@ private[spark] object ResourceUtils extends Logging { // Used to take a fraction amount from a task resource requirement and split into a real // integer amount and the number of slots per address. For instance, if the amount is 0.5, - // the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per - // address, which allows you to put 2 tasks on that address. Note if amount is greater - // than 1, then the number of slots per address has to be 1. This would indicate that a - // would have multiple addresses assigned per task. This can be used for calculating - // the number of tasks per executor -> (executorAmount * numParts) / (integer amount). + // the we get (1, 2) back out. This indicates that for each 1 address, it allows you to + // put 2 tasks on that address. Note if amount is greater than 1, then the number of + // running tasks per address has to be 1. This can be used for calculating + // the number of tasks per executor = (executorAmount * numParts) / (integer amount). // Returns tuple of (integer amount, numParts) def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = { - val parts = if (doubleAmount <= 0.5) { + val parts = if (doubleAmount <= 1.0) { Math.floor(1.0 / doubleAmount).toInt } else if (doubleAmount % 1 != 0) { throw new SparkException( - s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.") + s"The resource amount ${doubleAmount} must be either <= 1.0, or a whole number.") } else { 1 } diff --git a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala index cbd57808213a..9fc0d93373b5 100644 --- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala +++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since} * * @param resourceName Resource name * @param amount Amount requesting as a Double to support fractional resource requests. - * Valid values are less than or equal to 0.5 or whole numbers. This essentially + * Valid values are less than or equal to 1.0 or whole numbers. This essentially * lets you configure X number of tasks to run on a single resource, * ie amount equals 0.5 translates into 2 tasks per resource address. */ @@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since} class TaskResourceRequest(val resourceName: String, val amount: Double) extends Serializable { - assert(amount <= 0.5 || amount % 1 == 0, - s"The resource amount ${amount} must be either <= 0.5, or a whole number.") + assert(amount <= 1.0 || amount % 1 == 0, + s"The resource amount ${amount} must be either <= 1.0, or a whole number.") override def equals(obj: Any): Boolean = { obj match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala index 508c6cebd9fe..d9fbd23f3aa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala @@ -25,16 +25,27 @@ import org.apache.spark.resource.{ResourceAllocator, ResourceInformation} * information. * @param name Resource name * @param addresses Resource addresses provided by the executor - * @param numParts Number of ways each resource is subdivided when scheduling tasks */ private[spark] class ExecutorResourceInfo( name: String, - addresses: Seq[String], - numParts: Int) + addresses: Seq[String]) extends ResourceInformation(name, addresses.toArray) with ResourceAllocator { override protected def resourceName = this.name override protected def resourceAddresses = this.addresses - override protected def slotsPerAddress: Int = numParts - def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress + + /** + * Calculate how many parts the executor can offer according to the task resource amount + * @param taskAmount how many resource amount the task required + * @return the total parts + */ + def totalParts(taskAmount: Double): Int = { + assert(taskAmount > 0.0) + if (taskAmount >= 1.0) { + addresses.length / taskAmount.ceil.toInt + } else { + addresses.length * Math.floor(1.0 / taskAmount).toInt + } + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala new file mode 100644 index 000000000000..a93f2863ac2f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.HashMap + +import org.apache.spark.SparkException +import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfile} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +/** + * Class to hold information about a series of resources belonging to an executor. + * A resource could be a GPU, FPGA, etc. And it is used as a temporary + * class to calculate the resources amounts when offering resources to + * the tasks in the [[TaskSchedulerImpl]] + * + * One example is GPUs, where the addresses would be the indices of the GPUs + * + * @param resources The executor available resources and amount. eg, + * Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.2), + * "1" -> ResourceAmountUtils.toInternalResource(1.0)), + * "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.3), + * "b" -> ResourceAmountUtils.toInternalResource(0.9)) + * ) + */ +private[spark] class ExecutorResourcesAmounts( + private val resources: Map[String, Map[String, Long]]) extends Serializable { + + /** + * convert the resources to be mutable HashMap + */ + private val internalResources: Map[String, HashMap[String, Long]] = { + resources.map { case (rName, addressAmounts) => + rName -> HashMap(addressAmounts.toSeq: _*) + } + } + + /** + * The total address count of each resource. Eg, + * Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.5), + * "1" -> ResourceAmountUtils.toInternalResource(0.5), + * "2" -> ResourceAmountUtils.toInternalResource(0.5)), + * "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.5), + * "b" -> ResourceAmountUtils.toInternalResource(0.5))) + * the resourceAmount will be Map("gpu" -> 3, "fpga" -> 2) + */ + lazy val resourceAddressAmount: Map[String, Int] = internalResources.map { + case (rName, addressMap) => rName -> addressMap.size + } + + /** + * For testing purpose. convert internal resources back to the "fraction" resources. + */ + private[spark] def availableResources: Map[String, Map[String, Double]] = { + internalResources.map { case (rName, addressMap) => + rName -> addressMap.map { case (address, amount) => + address -> ResourceAmountUtils.toFractionalResource(amount) + }.toMap + } + } + + /** + * Acquire the resource. + * @param assignedResource the assigned resource information + */ + def acquire(assignedResource: Map[String, Map[String, Long]]): Unit = { + assignedResource.foreach { case (rName, taskResAmounts) => + val availableResourceAmounts = internalResources.getOrElse(rName, + throw new SparkException(s"Try to acquire an address from $rName that doesn't exist")) + taskResAmounts.foreach { case (address, amount) => + val prevInternalTotalAmount = availableResourceAmounts.getOrElse(address, + throw new SparkException(s"Try to acquire an address that doesn't exist. $rName " + + s"address $address doesn't exist.")) + + val left = prevInternalTotalAmount - amount + if (left < 0) { + throw new SparkException(s"The total amount " + + s"${ResourceAmountUtils.toFractionalResource(left)} " + + s"after acquiring $rName address $address should be >= 0") + } + internalResources(rName)(address) = left + } + } + } + + /** + * Release the assigned resources to the resource pool + * @param assignedResource resource to be released + */ + def release(assignedResource: Map[String, Map[String, Long]]): Unit = { + assignedResource.foreach { case (rName, taskResAmounts) => + val availableResourceAmounts = internalResources.getOrElse(rName, + throw new SparkException(s"Try to release an address from $rName that doesn't exist")) + taskResAmounts.foreach { case (address, amount) => + val prevInternalTotalAmount = availableResourceAmounts.getOrElse(address, + throw new SparkException(s"Try to release an address that is not assigned. $rName " + + s"address $address is not assigned.")) + val total = prevInternalTotalAmount + amount + if (total > ONE_ENTIRE_RESOURCE) { + throw new SparkException(s"The total amount " + + s"${ResourceAmountUtils.toFractionalResource(total)} " + + s"after releasing $rName address $address should be <= 1.0") + } + internalResources(rName)(address) = total + } + } + } + + /** + * Try to assign the addresses according to the task requirement. This function always goes + * through the available resources starting from the "small" address. If the resources amount + * of the address is matching the task requirement, we will assign this address to this task. + * Eg, assuming the available resources are {"gpu" -> {"0"-> 0.7, "1" -> 1.0}) and the + * task requirement is 0.5, this function will return Some(Map("gpu" -> {"0" -> 0.5})). + * + * TODO: as we consistently allocate addresses beginning from the "small" address, it can + * potentially result in an undesired consequence where a portion of the resource is being wasted. + * Eg, assuming the available resources are {"gpu" -> {"0"-> 1.0, "1" -> 0.5}) and the + * task amount requirement is 0.5, this function will return + * Some(Map("gpu" -> {"0" -> 0.5})), and the left available resource will be + * {"gpu" -> {"0"-> 0.5, "1" -> 0.5}) which can't assign to the task that + * requires > 0.5 any more. + * + * @param taskSetProf assign resources based on which resource profile + * @return the optional assigned resources amounts. returns None if any + * of the task requests for resources aren't met. + */ + def assignAddressesCustomResources(taskSetProf: ResourceProfile): + Option[Map[String, Map[String, Long]]] = { + // only look at the resource other than cpus + val tsResources = taskSetProf.getCustomTaskResources() + if (tsResources.isEmpty) { + return Some(Map.empty) + } + + val allocatedAddresses = HashMap[String, Map[String, Long]]() + + // Go through all resources here so that we can make sure they match and also get what the + // assignments are for the next task + for ((rName, taskReqs) <- tsResources) { + // TaskResourceRequest checks the task amount should be in (0, 1] or a whole number + var taskAmount = taskReqs.amount + + internalResources.get(rName) match { + case Some(addressesAmountMap) => + val allocatedAddressesMap = HashMap[String, Long]() + + // Always sort the addresses + val addresses = addressesAmountMap.keys.toSeq.sorted + + // task.amount is a whole number + if (taskAmount >= 1.0) { + for (address <- addresses if taskAmount > 0) { + // The address is still a whole resource + if (ResourceAmountUtils.isOneEntireResource(addressesAmountMap(address))) { + taskAmount -= 1.0 + // Assign the full resource of the address + allocatedAddressesMap(address) = ONE_ENTIRE_RESOURCE + } + } + } else if (taskAmount > 0.0) { // 0 < task.amount < 1.0 + val internalTaskAmount = ResourceAmountUtils.toInternalResource(taskAmount) + for (address <- addresses if taskAmount > 0) { + if (addressesAmountMap(address) >= internalTaskAmount) { + // Assign the part of the address. + allocatedAddressesMap(address) = internalTaskAmount + taskAmount = 0 + } + } + } + + if (taskAmount == 0 && allocatedAddressesMap.size > 0) { + allocatedAddresses.put(rName, allocatedAddressesMap.toMap) + } else { + return None + } + + case None => return None + } + } + Some(allocatedAddresses.toMap) + } + +} + +private[spark] object ExecutorResourcesAmounts { + + /** + * Create an empty ExecutorResourcesAmounts + */ + def empty: ExecutorResourcesAmounts = new ExecutorResourcesAmounts(Map.empty) + + /** + * Converts executor infos to ExecutorResourcesAmounts + */ + def apply(executorInfos: Map[String, ExecutorResourceInfo]): ExecutorResourcesAmounts = { + new ExecutorResourcesAmounts( + executorInfos.map { case (rName, rInfo) => rName -> rInfo.resourcesAmounts } + ) + } + +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 75032086ead7..df5f32612bea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -23,11 +23,10 @@ import java.nio.charset.StandardCharsets import java.util.Properties import scala.collection.immutable -import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.collection.mutable.{HashMap, Map} import scala.jdk.CollectionConverters._ import org.apache.spark.{JobArtifactSet, JobArtifactState} -import org.apache.spark.resource.ResourceInformation import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} /** @@ -57,7 +56,10 @@ private[spark] class TaskDescription( val artifacts: JobArtifactSet, val properties: Properties, val cpus: Int, - val resources: immutable.Map[String, ResourceInformation], + // resources is the total resources assigned to the task + // Eg, Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.7))): + // assign 0.7 of the gpu address "0" to this task + val resources: immutable.Map[String, immutable.Map[String, Long]], val serializedTask: ByteBuffer) { assert(cpus > 0, "CPUs per task should be > 0") @@ -74,14 +76,16 @@ private[spark] object TaskDescription { } } - private def serializeResources(map: immutable.Map[String, ResourceInformation], + private def serializeResources(map: immutable.Map[String, immutable.Map[String, Long]], dataOut: DataOutputStream): Unit = { dataOut.writeInt(map.size) - map.foreach { case (key, value) => - dataOut.writeUTF(key) - dataOut.writeUTF(value.name) - dataOut.writeInt(value.addresses.length) - value.addresses.foreach(dataOut.writeUTF(_)) + map.foreach { case (rName, addressAmountMap) => + dataOut.writeUTF(rName) + dataOut.writeInt(addressAmountMap.size) + addressAmountMap.foreach { case (address, amount) => + dataOut.writeUTF(address) + dataOut.writeLong(amount) + } } } @@ -172,21 +176,22 @@ private[spark] object TaskDescription { } private def deserializeResources(dataIn: DataInputStream): - immutable.Map[String, ResourceInformation] = { - val map = new HashMap[String, ResourceInformation]() + immutable.Map[String, immutable.Map[String, Long]] = { + val map = new HashMap[String, immutable.Map[String, Long]]() val mapSize = dataIn.readInt() var i = 0 while (i < mapSize) { val resType = dataIn.readUTF() - val name = dataIn.readUTF() - val numIdentifier = dataIn.readInt() - val identifiers = new ArrayBuffer[String](numIdentifier) + val addressAmountMap = new HashMap[String, Long]() + val addressAmountSize = dataIn.readInt() var j = 0 - while (j < numIdentifier) { - identifiers += dataIn.readUTF() + while (j < addressAmountSize) { + val address = dataIn.readUTF() + val amount = dataIn.readLong() + addressAmountMap(address) = amount j += 1 } - map(resType) = new ResourceInformation(name, identifiers.toArray) + map.put(resType, addressAmountMap.toMap) i += 1 } map.toMap diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 454e7ed3ce61..21f62097a4bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.util.Random import com.google.common.cache.CacheBuilder @@ -35,7 +35,7 @@ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ -import org.apache.spark.resource.{ResourceInformation, ResourceProfile} +import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality @@ -389,7 +389,7 @@ private[spark] class TaskSchedulerImpl( maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], - availableResources: Array[Map[String, Buffer[String]]], + availableResources: Array[ExecutorResourcesAmounts], tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : (Boolean, Option[TaskLocality]) = { var noDelayScheduleRejects = true @@ -429,13 +429,7 @@ private[spark] class TaskSchedulerImpl( minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality)) availableCpus(i) -= taskCpus assert(availableCpus(i) >= 0) - resources.foreach { case (rName, rInfo) => - // Remove the first n elements from availableResources addresses, these removed - // addresses are the same as that we allocated in taskResourceAssignments since it's - // synchronized. We don't remove the exact addresses allocated because the current - // approach produces the identical result with less time complexity. - availableResources(i)(rName).remove(0, rInfo.addresses.length) - } + availableResources(i).acquire(resources) } } catch { case e: TaskNotSerializableException => @@ -468,33 +462,14 @@ private[spark] class TaskSchedulerImpl( private def resourcesMeetTaskRequirements( taskSet: TaskSetManager, availCpus: Int, - availWorkerResources: Map[String, Buffer[String]] - ): Option[Map[String, ResourceInformation]] = { + availWorkerResources: ExecutorResourcesAmounts): Option[Map[String, Map[String, Long]]] = { val rpId = taskSet.taskSet.resourceProfileId val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf) // check if the ResourceProfile has cpus first since that is common case if (availCpus < taskCpus) return None // only look at the resource other than cpus - val tsResources = taskSetProf.getCustomTaskResources() - if (tsResources.isEmpty) return Some(Map.empty) - val localTaskReqAssign = HashMap[String, ResourceInformation]() - // we go through all resources here so that we can make sure they match and also get what the - // assignments are for the next task - for ((rName, taskReqs) <- tsResources) { - val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName) - availWorkerResources.get(rName) match { - case Some(workerRes) => - if (workerRes.size >= taskAmount) { - localTaskReqAssign.put(rName, new ResourceInformation(rName, - workerRes.take(taskAmount).toArray)) - } else { - return None - } - case None => return None - } - } - Some(localTaskReqAssign.toMap) + availWorkerResources.assignAddressesCustomResources(taskSetProf) } private def minTaskLocality( @@ -576,13 +551,8 @@ private[spark] class TaskSchedulerImpl( // value is -1 val numBarrierSlotsAvailable = if (taskSet.isBarrier) { val rpId = taskSet.taskSet.resourceProfileId - val availableResourcesAmount = availableResources.map { resourceMap => - // available addresses already takes into account if there are fractional - // task resource requests - resourceMap.map { case (name, addresses) => (name, addresses.length) } - } - calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, - availableResourcesAmount) + val resAmounts = availableResources.map(_.resourceAddressAmount) + calculateAvailableSlots(this, conf, rpId, resourceProfileIds, availableCpus, resAmounts) } else { -1 } @@ -715,9 +685,8 @@ private[spark] class TaskSchedulerImpl( barrierPendingLaunchTasks.foreach { task => // revert all assigned resources availableCpus(task.assignedOfferIndex) += task.assignedCores - task.assignedResources.foreach { case (rName, rInfo) => - availableResources(task.assignedOfferIndex)(rName).appendAll(rInfo.addresses) - } + availableResources(task.assignedOfferIndex).release( + task.assignedResources) // re-add the task to the schedule pending list taskSet.addPendingTask(task.index) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d17e6735c4ec..e15ba28eeda0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -21,7 +21,6 @@ import java.io.NotSerializableException import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, TimeUnit} -import scala.collection.immutable.Map import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.math.max import scala.util.control.NonFatal @@ -33,7 +32,6 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ -import org.apache.spark.resource.ResourceInformation import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} import org.apache.spark.util.collection.PercentileHeap @@ -444,7 +442,7 @@ private[spark] class TaskSetManager( host: String, maxLocality: TaskLocality.TaskLocality, taskCpus: Int = sched.CPUS_PER_TASK, - taskResourceAssignments: Map[String, ResourceInformation] = Map.empty) + taskResourceAssignments: Map[String, Map[String, Long]] = Map.empty) : (Option[TaskDescription], Boolean, Int) = { val offerExcluded = taskSetExcludelistHelperOpt.exists { excludeList => @@ -512,7 +510,7 @@ private[spark] class TaskSetManager( taskLocality: TaskLocality.Value, speculative: Boolean, taskCpus: Int, - taskResourceAssignments: Map[String, ResourceInformation], + taskResourceAssignments: Map[String, Map[String, Long]], launchTime: Long): TaskDescription = { // Found a task; do some bookkeeping and return a task description val task = tasks(index) @@ -1381,7 +1379,7 @@ private[scheduler] case class BarrierPendingLaunchTask( host: String, index: Int, taskLocality: TaskLocality.TaskLocality, - assignedResources: Map[String, ResourceInformation]) { + assignedResources: Map[String, Map[String, Long]]) { // Stored the corresponding index of the WorkerOffer which is responsible to launch the task. // Used to revert the assigned resources (e.g., cores, custome resources) when the barrier // task set doesn't launch successfully in a single resourceOffers round. diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala index 92a12f13576c..a7d63a8949e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler -import scala.collection.mutable.Buffer - import org.apache.spark.resource.ResourceProfile /** @@ -32,5 +30,5 @@ case class WorkerOffer( // `address` is an optional hostPort string, it provide more useful information than `host` // when multiple executors are launched on the same host. address: Option[String] = None, - resources: Map[String, Buffer[String]] = Map.empty, + resources: ExecutorResourcesAmounts = ExecutorResourcesAmounts.empty, resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index b49f5269169d..1f452ae7d109 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -80,7 +80,7 @@ private[spark] object CoarseGrainedClusterMessages { state: TaskState, data: SerializableBuffer, taskCpus: Int, - resources: Map[String, ResourceInformation] = Map.empty) + resources: Map[String, Map[String, Long]] = Map.empty) extends CoarseGrainedClusterMessage object StatusUpdate { @@ -91,7 +91,7 @@ private[spark] object CoarseGrainedClusterMessages { state: TaskState, data: ByteBuffer, taskCpus: Int, - resources: Map[String, ResourceInformation]): StatusUpdate = { + resources: Map[String, Map[String, Long]]): StatusUpdate = { StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), taskCpus, resources) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 831fbd45edd7..7e124302c726 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -172,9 +172,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += taskCpus - resources.foreach { case (k, v) => - executorInfo.resourcesInfo.get(k).foreach { r => - r.release(v.addresses.toImmutableArraySeq) + resources.foreach { case (rName, addressAmount) => + executorInfo.resourcesInfo.get(rName).foreach { r => + r.release(addressAmount) } } makeOffers(executorId) @@ -271,12 +271,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val resourcesInfo = resources.map { case (rName, info) => - // tell the executor it can schedule resources up to numSlotsPerAddress times, - // as configured by the user, or set to 1 as that is the default (1 task/resource) - val numParts = scheduler.sc.resourceProfileManager - .resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf) - (info.name, - new ExecutorResourceInfo(info.name, info.addresses.toImmutableArraySeq, numParts)) + (info.name, new ExecutorResourceInfo(info.name, info.addresses.toIndexedSeq)) } // If we've requested the executor figure out when we did. val reqTs: Option[Long] = CoarseGrainedSchedulerBackend.this.synchronized { @@ -385,9 +380,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } private def buildWorkerOffer(executorId: String, executorData: ExecutorData) = { - val resources = executorData.resourcesInfo.map { case (rName, rInfo) => - (rName, rInfo.availableAddrs.toBuffer) - } + val resources = ExecutorResourcesAmounts(executorData.resourcesInfo) WorkerOffer( executorId, executorData.executorHost, @@ -446,11 +439,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Do resources allocation here. The allocated resources will get released after the task // finishes. executorData.freeCores -= task.cpus - task.resources.foreach { case (rName, rInfo) => - assert(executorData.resourcesInfo.contains(rName)) - executorData.resourcesInfo(rName).acquire(rInfo.addresses.toImmutableArraySeq) + task.resources.foreach { case (rName, addressAmounts) => + executorData.resourcesInfo(rName).acquire(addressAmounts) } - logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + s"${executorData.executorHost}.") @@ -766,7 +757,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ( executor.resourceProfileId, executor.totalCores, - executor.resourcesInfo.map { case (name, rInfo) => (name, rInfo.totalAddressAmount) } + executor.resourcesInfo.map { case (name, rInfo) => + val taskAmount = rp.taskResources.get(name).get.amount + (name, rInfo.totalParts(taskAmount)) + } ) }.unzip3 } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 5f7958167507..8eb8d9c6dc85 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -465,7 +465,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst case (ratio, slots) => val conf = new SparkConf() conf.set(TASK_GPU_ID.amountConf, ratio.toString) - if (ratio > 0.5 && ratio % 1 != 0) { + if (ratio > 1.0 && ratio % 1 != 0) { assertThrows[SparkException] { parseResourceRequirements(conf, SPARK_TASK_PREFIX) } diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 45c27aea6022..a8c9550c6b76 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -304,14 +304,15 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", "host1", "host1", 4, env, None, resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) - assert(backend.taskResources.isEmpty) val taskId = 1000000L + val resourcesAmounts = Map(GPU -> Map( + "0" -> ResourceAmountUtils.toInternalResource(0.15), + "1" -> ResourceAmountUtils.toInternalResource(0.76))) // We don't really verify the data, just pass it around. val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, - 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, - Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) + 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, resourcesAmounts, data) val serializedTaskDescription = TaskDescription.encode(taskDescription) backend.rpcEnv.setupEndpoint("Executor 1", backend) backend.executor = mock[Executor](CALLS_REAL_METHODS) @@ -342,21 +343,22 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // Launch a new task shall add an entry to `taskResources` map. backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription))) eventually(timeout(10.seconds)) { - assert(backend.taskResources.size == 1) assert(runningTasks.size == 1) - val resources = backend.taskResources.get(taskId) - assert(resources(GPU).addresses sameElements Array("0", "1")) + val resources = backend.executor.runningTasks.get(taskId).taskDescription.resources + assert(resources(GPU).keys.toArray.sorted sameElements Array("0", "1")) + assert(executor.runningTasks.get(taskId).taskDescription.resources + === resourcesAmounts) } // Update the status of a running task shall not affect `taskResources` map. backend.statusUpdate(taskId, TaskState.RUNNING, data) - assert(backend.taskResources.size == 1) - val resources = backend.taskResources.get(taskId) - assert(resources(GPU).addresses sameElements Array("0", "1")) + val resources = backend.executor.runningTasks.get(taskId).taskDescription.resources + assert(resources(GPU).keys.toArray.sorted sameElements Array("0", "1")) + assert(executor.runningTasks.get(taskId).taskDescription.resources + === resourcesAmounts) // Update the status of a finished task shall remove the entry from `taskResources` map. backend.statusUpdate(taskId, TaskState.FINISHED, data) - assert(backend.taskResources.isEmpty) } finally { if (backend != null) { backend.rpcEnv.shutdown() @@ -424,11 +426,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val tasksKilled = new TrieMap[Long, Boolean]() val tasksExecuted = new TrieMap[Long, Boolean]() + val resourcesAmounts = Map(GPU -> Map( + "0" -> ResourceAmountUtils.toInternalResource(0.15), + "1" -> ResourceAmountUtils.toInternalResource(0.76))) + // Fake tasks with different taskIds. val taskDescriptions = (1 to numTasks).map { taskId => new TaskDescription(taskId, 2, "1", s"TASK $taskId", 19, - 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, - Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) + 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, resourcesAmounts, data) } assert(taskDescriptions.length == numTasks) @@ -513,11 +518,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val tasksKilled = new TrieMap[Long, Boolean]() val tasksExecuted = new TrieMap[Long, Boolean]() + val resourcesAmounts = Map(GPU -> Map( + "0" -> ResourceAmountUtils.toInternalResource(0.15), + "1" -> ResourceAmountUtils.toInternalResource(0.76))) + // Fake tasks with different taskIds. val taskDescriptions = (1 to numTasks).map { taskId => new TaskDescription(taskId, 2, "1", s"TASK $taskId", 19, - 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, - Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) + 1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, resourcesAmounts, data) } assert(taskDescriptions.length == numTasks) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 49e19dd2a00e..805e7ca46749 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -642,7 +642,7 @@ class ExecutorSuite extends SparkFunSuite JobArtifactSet.emptyJobArtifactSet, properties = new Properties, cpus = 1, - resources = immutable.Map[String, ResourceInformation](), + resources = Map.empty, serializedTask) } diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index be38315cd75f..8f68fd547fb3 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -357,12 +357,11 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { var taskError = intercept[AssertionError] { rprof.require(new TaskResourceRequests().resource("gpu", 1.5)) }.getMessage() - assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, or a whole number.")) + assert(taskError.contains("The resource amount 1.5 must be either <= 1.0, or a whole number.")) - taskError = intercept[AssertionError] { - rprof.require(new TaskResourceRequests().resource("gpu", 0.7)) - }.getMessage() - assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, or a whole number.")) + rprof.require(new TaskResourceRequests().resource("gpu", 0.7)) + rprof.require(new TaskResourceRequests().resource("gpu", 1.0)) + rprof.require(new TaskResourceRequests().resource("gpu", 2.0)) } test("ResourceProfile has correct custom executor resources") { @@ -393,6 +392,33 @@ class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { "Task resources should have 1 custom resource") } + test("SPARK-45527 fractional TaskResourceRequests in ResourceProfile") { + val ereqs = new ExecutorResourceRequests().cores(6).resource("gpus", 6) + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + new ResourceProfileBuilder().require(ereqs).require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.5) + new ResourceProfileBuilder().require(ereqs).require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.7) + + val msg = intercept[AssertionError] { + new ResourceProfileBuilder().require(ereqs).require(treqs).build() + }.getMessage + assert(msg.contains("The task resource amount 0.7 must be either <= 0.5, or a whole number")) + } + + test("SPARK-45527 fractional TaskResourceRequests in TaskResourceProfile") { + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + new ResourceProfileBuilder().require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.5) + new ResourceProfileBuilder().require(treqs).build() + + treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.7) + new ResourceProfileBuilder().require(treqs).build() + } + private def withMockSparkEnv(conf: SparkConf)(f: => Unit): Unit = { val previousEnv = SparkEnv.get val mockEnv = mock[SparkEnv] diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index 20d6cc767158..7fd8a11463d8 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -26,6 +26,7 @@ import org.json4s.{DefaultFormats, Extraction} import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, SparkFunSuite} import org.apache.spark.TestUtils._ import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.util.Utils @@ -336,4 +337,250 @@ class ResourceUtilsSuite extends SparkFunSuite assert(error.contains("User is expecting to use resource: gpu, but " + "didn't specify a discovery script!")) } + + test("SPARK-45527 warnOnWastedResources for ResourceProfile") { + val conf = new SparkConf() + conf.set("spark.executor.cores", "10") + conf.set("spark.task.cpus", "1") + conf.set(RESOURCES_WARNING_TESTING, true) + + // cpu limiting task number = 10/1, gpu limiting task number = 1/0.1 + var ereqs = new ExecutorResourceRequests().cores(10).resource("gpu", 1) + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + var rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + // no exception, + warnOnWastedResources(rp, conf) + + ereqs = new ExecutorResourceRequests().cores(10).resource("gpu", 1) + treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + // no exception, + warnOnWastedResources(rp, conf) + + ereqs = new ExecutorResourceRequests().cores(20).resource("gpu", 2) + treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + // no exception, + warnOnWastedResources(rp, conf) + + var msg: String = "" + + // Test cpu limiting task number + // format: (executor.core, task.cpus, executor.gpu, task.gpu, expected runnable tasks) + Seq( + (10, 2, 1, 0.1, 5), // cpu limiting task number=10/2=5, gpu limiting task number = 1/0.1 = 10 + (10, 3, 1, 0.1, 3), // cpu limiting task number=10/3=3, gpu limiting task number = 1/0.1 = 10 + (10, 4, 1, 0.1, 2), // cpu limiting task number=10/4=2, gpu limiting task number = 1/0.1 = 10 + (10, 5, 1, 0.1, 2), // cpu limiting task number=10/5=2, gpu limiting task number = 1/0.1 = 10 + (10, 6, 1, 0.1, 1), // cpu limiting task number=10/6=1, gpu limiting task number = 1/0.1 = 10 + (10, 10, 1, 0.1, 1), // cpu limiting task number=10/6=1, gpu limiting task number = 1/0.1 = 10 + (20, 7, 1, 0.1, 2), // cpu limiting task number=20/7=3, gpu limiting task number = 1/0.1 = 10 + (30, 7, 1, 0.1, 4), // cpu limiting task number=30/7=4, gpu limiting task number = 1/0.1 = 10 + (50, 14, 1, 0.1, 3) // cpu limiting task number=50/14=3, gpu limiting task number = 1/0.1=10 + ).foreach { case (executorCores, taskCpus: Int, executorGpus: Int, taskGpus: Double, + expectedTaskNumber: Int) => + ereqs = new ExecutorResourceRequests().cores(executorCores).resource("gpu", executorGpus) + treqs = new TaskResourceRequests().cpus(taskCpus).resource("gpu", taskGpus) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of resource: gpu (exec = 1, task = 0.1/10, runnable " + + "tasks = 10) will result in wasted resources due to resource cpus limiting the number of " + + s"runnable tasks per executor to: ${expectedTaskNumber}. Please adjust your configuration.") + ) + } + + // Test gpu limiting task number + // format: (executor.core, task.cpus, executor.gpu, task.gpu, expected runnable tasks) + Seq( + (10, 1, 1, 1.0/9, 9), // cpu limiting task number=10, gpu limiting task number = 9 + (10, 1, 1, 1.0/8, 8), // cpu limiting task number=10, gpu limiting task number = 8 + (10, 1, 1, 1.0/7, 7), // cpu limiting task number=10, gpu limiting task number = 7 + (10, 1, 1, 1.0/6, 6), // cpu limiting task number=10, gpu limiting task number = 6 + (10, 1, 1, 1.0/5, 5), // cpu limiting task number=10, gpu limiting task number = 5 + (10, 1, 1, 1.0/4, 4), // cpu limiting task number=10, gpu limiting task number = 4 + (10, 1, 1, 1.0/3, 3), // cpu limiting task number=10, gpu limiting task number = 3 + (10, 1, 1, 1.0/2, 2), // cpu limiting task number=10, gpu limiting task number = 2 + (10, 1, 1, 1.0, 1), // cpu limiting task number=10, gpu limiting task number = 1 + (30, 1, 2, 1.0/9, 2*9), // cpu limiting task number=30, gpu limiting task number = 2*9 + (30, 1, 2, 1.0/8, 2*8), // cpu limiting task number=30, gpu limiting task number = 2*8 + (30, 1, 2, 1.0/7, 2*7), // cpu limiting task number=30, gpu limiting task number = 2*7 + (30, 1, 2, 1.0/6, 2*6), // cpu limiting task number=30, gpu limiting task number = 2*6 + (30, 1, 2, 1.0/5, 2*5), // cpu limiting task number=30, gpu limiting task number = 2*5 + (30, 1, 2, 1.0/4, 2*4), // cpu limiting task number=30, gpu limiting task number = 2*4 + (30, 1, 2, 1.0/3, 2*3), // cpu limiting task number=30, gpu limiting task number = 2*3 + (30, 1, 2, 1.0/2, 2*2), // cpu limiting task number=30, gpu limiting task number = 2*2 + (30, 1, 2, 1.0, 2*1), // cpu limiting task number=30, gpu limiting task number = 2*1 + (30, 1, 2, 2.0, 1), // cpu limiting task number=30, gpu limiting task number = 1 + (70, 2, 7, 0.5, 7*2), // cpu limiting task number=30, gpu limiting task number = 7*1/0.5= + (80, 3, 9, 2.0, 9/2) // cpu limiting task number=30, gpu limiting task number = 9/2 + ).foreach { case (executorCores, taskCpus: Int, executorGpus: Int, taskGpus: Double, + expectedTaskNumber: Int) => + ereqs = new ExecutorResourceRequests().cores(executorCores).resource("gpu", executorGpus) + treqs = new TaskResourceRequests().cpus(taskCpus).resource("gpu", taskGpus) + rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build() + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains(s"The configuration of cores (exec = ${executorCores} task = " + + s"${taskCpus}, runnable tasks = ${executorCores/taskCpus}) " + + "will result in wasted resources due to resource gpu limiting the number of runnable " + + s"tasks per executor to: ${expectedTaskNumber}. Please adjust your configuration")) + } + } + + private class FakedTaskResourceProfile( + val defaultRp: ResourceProfile, + override val taskResources: Map[String, TaskResourceRequest]) + extends TaskResourceProfile(taskResources) { + override protected[spark] def getCustomExecutorResources() + : Map[String, ExecutorResourceRequest] = defaultRp.getCustomExecutorResources() + } + + test("SPARK-45527 warnOnWastedResources for TaskResourceProfile when executor number = 1") { + val conf = new SparkConf() + conf.set("spark.executor.cores", "10") + conf.set("spark.task.cpus", "1") + conf.set(TASK_GPU_ID.amountConf, "0.1") + conf.set(EXECUTOR_GPU_ID.amountConf, "1") + conf.set(RESOURCES_WARNING_TESTING, true) + + val defaultDp = ResourceProfile.getOrCreateDefaultProfile(conf) + + // cpu limiting task number = 10/1, gpu limiting task number = 1/0.1 + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + var rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + // no exception, + warnOnWastedResources(rp, conf) + + var msg: String = "" + + // Test cpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (2, 0.1, 5), // cpu limiting task number = 10/2=5, gpu limiting task number = 1/0.1 = 10 + (3, 0.1, 3), // cpu limiting task number = 10/3 = 3, gpu limiting task number = 1/0.1 = 10 + (4, 0.1, 2), // cpu limiting task number = 10/4 = 2, gpu limiting task number = 1/0.1 = 10 + (5, 0.1, 2), // cpu limiting task number = 10/5 = 2, gpu limiting task number = 1/0.1 = 10 + (6, 0.1, 1), // cpu limiting task number = 10/6 = 1, gpu limiting task number = 1/0.1 = 10 + (7, 0.1, 1), // cpu limiting task number = 10/7 = 1, gpu limiting task number = 1/0.1 = 10 + (10, 0.1, 1) // cpu limiting task number = 10/10 = 1, gpu limiting task number = 1/0.1 = 10 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of resource: gpu (exec = 1, task = 0.1/10, runnable " + + "tasks = 10) will result in wasted resources due to resource cpus limiting the number of " + + s"runnable tasks per executor to: $expectedTaskNumber. Please adjust your configuration.") + ) + } + + // Test gpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (1, 0.111, 9), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.111=9 + (1, 0.125, 8), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.125=8 + (1, 0.142, 7), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.142=7 + (1, 0.166, 6), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.166=6 + (1, 0.2, 5), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.2=5 + (1, 0.25, 4), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.25=4 + (1, 0.333, 3), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.333=3 + (1, 0.5, 2), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.166=2 + (1, 0.6, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.6=1 + (1, 0.7, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.7=1 + (1, 0.8, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.8=1 + (1, 0.9, 1), // cpu limiting task number = 10/1, gpu limiting task number = 1/0.9=1 + (1, 1.0, 1) // cpu limiting task number = 10/1, gpu limiting task number = 1/1.0=1 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of cores (exec = 10 task = 1, runnable tasks = 10) " + + "will result in wasted resources due to resource gpu limiting the number of runnable " + + s"tasks per executor to: $expectedTaskNumber. Please adjust your configuration")) + } + } + + test("SPARK-45527 warnOnWastedResources for TaskResourceProfile when executor number > 1") { + val conf = new SparkConf() + conf.set("spark.executor.cores", "60") + conf.set("spark.task.cpus", "1") + conf.set(TASK_GPU_ID.amountConf, "0.1") + conf.set(EXECUTOR_GPU_ID.amountConf, "6") + conf.set(RESOURCES_WARNING_TESTING, true) + + val defaultDp = ResourceProfile.getOrCreateDefaultProfile(conf) + + // cpu limiting task number = 60/1, gpu limiting task number = 6/0.1 + var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1) + var rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + // no exception, + warnOnWastedResources(rp, conf) + + // cpu limiting task number = 60/2 = 30, gpu limiting task number = 6/0.2 = 30 + treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + // no exception + warnOnWastedResources(rp, conf) + + var msg: String = "" + + // Test cpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (4, 0.2, 15), // cpu limiting task number = 60/4=15, gpu limiting task number = 6/0.2 = 30 + (7, 0.2, 8), // cpu limiting task number = 60/7 = 8, gpu limiting task number = 6/0.2 = 30 + (30, 0.2, 2), // cpu limiting task number = 60/30 = 2, gpu limiting task number = 6/0.2 = 30 + (31, 0.2, 1), // cpu limiting task number = 60/31 = 1, gpu limiting task number = 6/0.2 = 30 + (55, 0.2, 1), // cpu limiting task number = 60/55 = 1, gpu limiting task number = 6/0.2 = 30 + (60, 0.2, 1) // cpu limiting task number = 60/60 = 1, gpu limiting task number = 6/0.2 = 30 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of resource: gpu (exec = 6, task = 0.2/5, runnable " + + "tasks = 30) will result in wasted resources due to resource cpus limiting the number of " + + s"runnable tasks per executor to: $expectedTaskNumber. Please adjust your configuration.") + ) + } + + // Test gpu limiting task number + // format: (task cpu cores, task gpu amount, expected runnable tasks) + // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, TASK_GPU_ID=0.1 + Seq( + (1, 0.111, 54), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.111=54 + (1, 0.125, 48), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.125=48 + (1, 0.142, 42), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.142=42 + (1, 0.166, 36), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.166=36 + (1, 0.2, 30), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.2=30 + (1, 0.25, 24), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.25=24 + (1, 0.33, 18), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.33=18 + (1, 0.5, 12), // cpu limiting task number = 60/1, gpu limiting task number = 6*1/0.5=12 + (1, 0.7, 6), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6*1/0.7 = 6 + (1, 1.0, 6), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/1 = 6 + (1, 2.0, 3), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/2 = 3 + (1, 3.0, 2), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/3 = 2 + (1, 4.0, 1), // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/4 = 1 + (1, 6.0, 1) // cpu limiting task number = 60/1 = 60, gpu limiting task number = 6/6 = 1 + ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) => + treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus) + rp = new FakedTaskResourceProfile(defaultDp, treqs.requests) + msg = intercept[SparkException] { + warnOnWastedResources(rp, conf) + }.getMessage + assert(msg.contains("The configuration of cores (exec = 60 task = 1, runnable tasks = 60) " + + "will result in wasted resources due to resource gpu limiting the number of runnable " + + s"tasks per executor to: $expectedTaskNumber. Please adjust your configuration")) + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index e9b8ae4bffe6..1b444aa60474 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -37,6 +37,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.rdd.RDD import org.apache.spark.resource.{ExecutorResourceRequests, ResourceInformation, ResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} @@ -254,7 +255,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") assert(exec3ResourceProfileId === rp.id) - val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) + val taskResources = Map(GPU -> Map("0" -> ONE_ENTIRE_RESOURCE)) val taskCpus = 1 val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, JobArtifactSet.emptyJobArtifactSet, new Properties(), @@ -361,7 +362,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") assert(exec3ResourceProfileId === rp.id) - val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) + val taskResources = Map(GPU -> Map("0" -> ONE_ENTIRE_RESOURCE)) val taskCpus = 1 val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, JobArtifactSet.emptyJobArtifactSet, new Properties(), diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index 203e30cac1a7..ac1a6b7c3ec0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -18,74 +18,87 @@ package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.resource.ResourceAmountUtils import org.apache.spark.resource.ResourceUtils.GPU -import org.apache.spark.util.ArrayImplicits._ class ExecutorResourceInfoSuite extends SparkFunSuite { + implicit def convertMapLongToDouble(resources: Map[String, Long]): Map[String, Double] = { + resources.map { case (k, v) => k -> ResourceAmountUtils.toFractionalResource(v) } + } + + implicit def convertMapDoubleToLong(resources: Map[String, Double]): Map[String, Long] = { + resources.map { case (k, v) => k -> ResourceAmountUtils.toInternalResource(v) } + } + test("Track Executor Resource information") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3")) assert(info.assignedAddrs.isEmpty) + val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap // Acquire addresses - info.acquire(Array("0", "1").toImmutableArraySeq) + info.acquire(reqResource) assert(info.availableAddrs.sorted sameElements Seq("2", "3")) assert(info.assignedAddrs.sorted sameElements Seq("0", "1")) // release addresses - info.release(Array("0", "1").toImmutableArraySeq) + info.release(reqResource) assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3")) assert(info.assignedAddrs.isEmpty) } test("Don't allow acquire address that is not available") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) // Acquire some addresses. - info.acquire(Seq("0", "1")) + val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap + info.acquire(reqResource) assert(!info.availableAddrs.contains("1")) // Acquire an address that is not available val e = intercept[SparkException] { - info.acquire(Array("1").toImmutableArraySeq) + info.acquire(convertMapDoubleToLong(Map("1" -> 1.0))) } - assert(e.getMessage.contains("Try to acquire an address that is not available.")) + assert(e.getMessage.contains("Try to acquire gpu address 1 amount: 1.0, but only 0.0 left.")) } test("Don't allow acquire address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) assert(!info.availableAddrs.contains("4")) // Acquire an address that doesn't exist val e = intercept[SparkException] { - info.acquire(Array("4").toImmutableArraySeq) + info.acquire(convertMapDoubleToLong(Map("4" -> 1.0))) } assert(e.getMessage.contains("Try to acquire an address that doesn't exist.")) } test("Don't allow release address that is not assigned") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) // Acquire addresses - info.acquire(Array("0", "1").toImmutableArraySeq) + val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap + info.acquire(reqResource) assert(!info.assignedAddrs.contains("2")) // Release an address that is not assigned val e = intercept[SparkException] { - info.release(Array("2").toImmutableArraySeq) + info.release(convertMapDoubleToLong(Map("2" -> 1.0))) } - assert(e.getMessage.contains("Try to release an address that is not assigned.")) + assert(e.getMessage.contains("Try to release gpu address 2 amount: 1.0. " + + "But the total amount: 2.0 after release should be <= 1")) } test("Don't allow release address that doesn't exist") { // Init Executor Resource. - val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1) + val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3")) assert(!info.assignedAddrs.contains("4")) // Release an address that doesn't exist val e = intercept[SparkException] { - info.release(Array("4").toImmutableArraySeq) + info.release(convertMapDoubleToLong(Map("4" -> 1.0))) } assert(e.getMessage.contains("Try to release an address that doesn't exist.")) } @@ -94,23 +107,92 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) val addresses = ArrayBuffer("0", "1", "2", "3") slotSeq.foreach { slots => - val info = new ExecutorResourceInfo(GPU, addresses.toSeq, slots) + val taskAmount = 1.0 / slots + val info = new ExecutorResourceInfo(GPU, addresses.toSeq) for (_ <- 0 until slots) { - addresses.foreach(addr => info.acquire(Seq(addr))) + addresses.foreach(addr => info.acquire(convertMapDoubleToLong(Map(addr -> taskAmount)))) } - // assert that each address was assigned `slots` times - info.assignedAddrs - .groupBy(identity) - .transform((_, v) => v.size) - .foreach(x => assert(x._2 == slots)) + // All addresses has been assigned + assert(info.resourcesAmounts.values.toSeq.toSet.size == 1) + // The left amount of any address should < taskAmount + assert(ResourceAmountUtils.toFractionalResource(info.resourcesAmounts("0")) < taskAmount) addresses.foreach { addr => assertThrows[SparkException] { - info.acquire(Seq(addr)) + info.acquire(convertMapDoubleToLong(Map(addr -> taskAmount))) } - assert(!info.availableAddrs.contains(addr)) } } } + + def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], + eps: Double = 0.00000001): Boolean = { + lhs.size == rhs.size && + lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => + lName == rName && (lAmount - rAmount).abs < eps + } + } + + test("assign/release resource for different task requirements") { + val execInfo = new ExecutorResourceInfo("gpu", Seq("0", "1", "2", "3")) + + def testAllocation(taskAddressAmount: Map[String, Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + execInfo.acquire(taskAddressAmount) + val leftRes = execInfo.resourcesAmounts + assert(compareMaps(leftRes, expectedLeftRes)) + } + + def testRelease(releasedRes: Map[String, Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + execInfo.release(releasedRes) + val leftRes = execInfo.resourcesAmounts + assert(compareMaps(leftRes, expectedLeftRes)) + } + + testAllocation(taskAddressAmount = Map("0" -> 0.2), + expectedLeftRes = Map("0" -> 0.8, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 0.2), + expectedLeftRes = Map("0" -> 0.6, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("1" -> 1.0, "2" -> 1.0), + expectedLeftRes = Map("0" -> 0.6, "1" -> 0.0, "2" -> 0.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 0.1, "2" -> 0.8), + expectedLeftRes = Map("0" -> 0.7, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 0.50002), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("3" -> 1.0), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 0.0)) + + testAllocation(taskAddressAmount = Map("2" -> 0.2), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.6, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 0.80002, "1" -> 1.0, "2" -> 0.4, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 1.0, "1" -> 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testAllocation(taskAddressAmount = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 0.0, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala new file mode 100644 index 000000000000..e512327fefa7 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala @@ -0,0 +1,545 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable.ArrayBuffer +import scala.language.implicitConversions + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfileBuilder, TaskResourceRequests} +import org.apache.spark.resource.ResourceUtils.GPU + +class ExecutorResourcesAmountsSuite extends SparkFunSuite { + + implicit def toFractionalResource(resources: Map[String, Long]): Map[String, Double] = + resources.map { case (k, v) => k -> ResourceAmountUtils.toFractionalResource(v) } + + implicit def toInternalResource(resources: Map[String, Double]): Map[String, Long] = + resources.map { case (k, v) => k -> ResourceAmountUtils.toInternalResource(v) } + + implicit def toInternalResourceMap(resources: Map[String, Map[String, Double]]): + Map[String, Map[String, Long]] = + resources.map { case (resName, addressesAmountMap) => + resName -> addressesAmountMap.map { case (k, v) => + k -> ResourceAmountUtils.toInternalResource(v) } + } + + def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], + eps: Double = 0.00000001): Boolean = { + lhs.size == rhs.size && + lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => + lName == rName && (lAmount - rAmount).abs < eps + } + } + + test("assign to rp without task resources requirement") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val treqs = new TaskResourceRequests().cpus(1) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // assign nothing to rp without resource profile + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(assigned.isDefined) + assigned.foreach { case resource => assert(resource.isEmpty) } + } + + test("Convert ExecutorResourceInfos to ExecutorResourcesAmounts") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + // default resources amounts of executors info + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // Update executors info + // executors info shouldn't be changed. + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + rInfo.acquire(toInternalResource(Map("2" -> 0.4, "6" -> 0.6))) + } else { + rInfo.acquire(toInternalResource(Map("aa" -> 0.2, "bb" -> 0.7))) + } + } + + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 0.6, "4" -> 1.0, "6" -> 0.4))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 0.8, "bb" -> 0.3))) + } + } + + val availableExecResAmounts1 = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts1.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val availableRes1 = availableExecResAmounts1.availableResources + availableRes1.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 0.6, "4" -> 1.0, "6" -> 0.4))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 0.8, "bb" -> 0.3))) + } + } + + } + + test("ExecutorResourcesAmounts shouldn't change ExecutorResourceInfo") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + + // default resources amounts of executors info + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val gpuTaskAmount = 0.1 + val treqs = new TaskResourceRequests().resource("gpu", gpuTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // taskMount = 0.1 < 1.0 which can be assigned. + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + // update the value + availableExecResAmounts.acquire(assigned.get) + + val availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, + Map("2" -> (1.0 - gpuTaskAmount), "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // executors info shouldn't be changed. + executorsInfo.foreach { case (rName, rInfo) => + if (rName == "gpu") { + assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + } + + test("executor resources are not matching to the task requirement") { + val totalRes = Map("gpu" -> Map("2" -> 0.4)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + val gpuTaskAmount = 0.6 + val treqs = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(assigned.isEmpty) + } + + test("part of executor resources are not matching to the task requirement") { + val totalRes = Map("gpu" -> Map("2" -> 0.4), "fpga" -> Map("aa" -> 0.8)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + // normal allocation + val gpuTaskAmount = 0.3 + val fpgaTaskAmount = 0.8 + val treqs = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + .resource("fpga", fpgaTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + var assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resource => assert(!resource.isEmpty)} + + val treqs1 = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + .resource("fpga", 0.9) // couldn't allocate fpga + val rp1 = new ResourceProfileBuilder().require(treqs1).build() + + assigned = availableExecResAmounts.assignAddressesCustomResources(rp1) + assert(assigned.isEmpty) + } + + test("the total amount after release should be <= 1.0") { + val totalRes = Map("gpu" -> Map("2" -> 0.4)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + val e = intercept[SparkException] { + availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.7)))) + } + assert(e.getMessage.contains("after releasing gpu address 2 should be <= 1.0")) + + availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.6)))) + assert(compareMaps(availableExecResAmounts.availableResources("gpu"), Map("2" -> 1.0))) + } + + test("the total amount after acquire should be >= 0") { + val totalRes = Map("gpu" -> Map("2" -> 0.4)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + val e = intercept[SparkException] { + availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.6)))) + } + assert(e.getMessage.contains("after acquiring gpu address 2 should be >= 0")) + + availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("2" -> 0.4)))) + assert(compareMaps(availableExecResAmounts.availableResources("gpu"), Map("2" -> 0.0))) + } + + test("Ensure that we can acquire the same fractions of a resource") { + val slotSeq = Seq(31235, 1024, 512, 256, 128, 64, 32, 16, 12, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + val addresses = ArrayBuffer("0", "1", "2", "3") + val info = new ExecutorResourceInfo(GPU, addresses.toSeq) + + slotSeq.foreach { slots => + val taskAmount = 1.0 / slots + val availableExecResAmounts = ExecutorResourcesAmounts(Map(GPU -> info)) + for (_ <- 0 until slots) { + addresses.foreach(addr => + availableExecResAmounts.acquire( + toInternalResourceMap(Map(GPU -> Map(addr -> taskAmount))))) + } + + assert(availableExecResAmounts.availableResources.size === 1) + // All addresses has been assigned + assert(availableExecResAmounts.availableResources(GPU).values.toSeq.toSet.size === 1) + // The left amount of any address should < taskAmount + assert(availableExecResAmounts.availableResources(GPU)("0") < taskAmount) + + addresses.foreach { addr => + assertThrows[SparkException] { + availableExecResAmounts.acquire( + toInternalResourceMap(Map(GPU -> Map(addr -> taskAmount)))) + } + } + } + } + + test("assign acquire release on single task resource request") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val gpuTaskAmount = 0.1 + val treqs = new TaskResourceRequests().resource("gpu", gpuTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // taskMount = 0.1 < 1.0 which can be assigned. + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resource => + assert(resource.size === 1) + assert(resource.keys.toSeq === Seq("gpu")) + assert(resource("gpu").size === 1) + assert(resource("gpu").keys.toSeq === Seq("2")) + assert(ResourceAmountUtils.toFractionalResource(resource("gpu")("2")) === gpuTaskAmount) + } + + // assign will not update the real value. + var availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // acquire will updates the value + availableExecResAmounts.acquire(assigned.get) + + // after acquire + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map( + "2" -> (1.0 - gpuTaskAmount), + "4" -> 1.0, + "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // release + availableExecResAmounts.release(assigned.get) + + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + } + + test("assign acquire release on multiple task resources request") { + val executorsInfo = Map( + "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")), + "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb")) + ) + + val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo) + + assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, "fpga" -> 2)) + + val gpuTaskAmount = 0.1 + val fpgaTaskAmount = 0.3 + val treqs = new TaskResourceRequests() + .resource("gpu", gpuTaskAmount) + .resource("fpga", fpgaTaskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + + // taskMount = 0.1 < 1.0 which can be assigned. + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resourceAmounts => + assert(resourceAmounts.size === 2) + assert(resourceAmounts.keys.toSeq.sorted === Seq("gpu", "fpga").sorted) + + assert(resourceAmounts("gpu").size === 1) + assert(resourceAmounts("gpu").keys.toSeq === Seq("2")) + assert(ResourceAmountUtils.toFractionalResource(resourceAmounts("gpu")("2")) === + gpuTaskAmount) + + assert(resourceAmounts("fpga").size === 1) + assert(resourceAmounts("fpga").keys.toSeq === Seq("aa")) + assert(ResourceAmountUtils.toFractionalResource(resourceAmounts("fpga")("aa")) === + fpgaTaskAmount) + } + + // assign will not update the real value. + var availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + + // acquire will updates the value + availableExecResAmounts.acquire(assigned.get) + + // after acquire + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, + Map("2" -> (1.0 - gpuTaskAmount), "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> (1.0 - fpgaTaskAmount), "bb" -> 1.0))) + } + } + + // release + availableExecResAmounts.release(assigned.get) + + availableRes = availableExecResAmounts.availableResources + availableRes.foreach { case (rName, addressesAmount) => + if (rName == "gpu") { + assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 1.0))) + } else { + assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0))) + } + } + } + + test("assign/release resource for different task requirements") { + val totalRes = Map("gpu" -> Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + def testAllocation(taskAmount: Double, + expectedAssignedAddress: Array[String], + expectedAssignedAmount: Array[Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + val treqs = new TaskResourceRequests().resource("gpu", taskAmount) + val rp = new ResourceProfileBuilder().require(treqs).build() + val assigned = availableExecResAmounts.assignAddressesCustomResources(rp) + assert(!assigned.isEmpty) + assigned.foreach { case resources => + assert( + resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource(_)) + === expectedAssignedAmount.sorted) + + availableExecResAmounts.acquire(resources) + + val leftRes = availableExecResAmounts.availableResources + assert(leftRes.size == 1) + assert(leftRes.keys.toSeq(0) == "gpu") + assert(compareMaps(leftRes("gpu"), expectedLeftRes)) + } + } + + def testRelease(releasedRes: Map[String, Double], + expectedLeftRes: Map[String, Double] + ): Unit = { + availableExecResAmounts.release(Map("gpu" -> releasedRes)) + + val leftRes = availableExecResAmounts.availableResources + assert(leftRes.size == 1) + assert(leftRes.keys.toSeq(0) == "gpu") + assert(compareMaps(leftRes("gpu"), expectedLeftRes)) + } + + // request 0.2 gpu, ExecutorResourcesAmounts should assign "0", + testAllocation(taskAmount = 0.2, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(0.2), + expectedLeftRes = Map("0" -> 0.8, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 0.2 gpu again, ExecutorResourcesAmounts should assign "0", + testAllocation(taskAmount = 0.2, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(0.2), + expectedLeftRes = Map("0" -> 0.6, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 2 gpus, ExecutorResourcesAmounts should assign "1" and "2", + testAllocation(taskAmount = 2, + expectedAssignedAddress = Array("1", "2"), + expectedAssignedAmount = Array(1.0, 1.0), + expectedLeftRes = Map("0" -> 0.6, "1" -> 0.0, "2" -> 0.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 0.1, "2" -> 0.8), + expectedLeftRes = Map("0" -> 0.7, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + // request 0.50002 gpu, ExecutorResourcesAmounts should assign "0", + testAllocation(taskAmount = 0.50002, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(0.50002), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0)) + + // request 1 gpu, ExecutorResourcesAmounts should assign "3", + testAllocation(taskAmount = 1.0, + expectedAssignedAddress = Array("3"), + expectedAssignedAmount = Array(1.0), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 0.0)) + + // request 0.2 gpu, ExecutorResourcesAmounts should assign "2", + testAllocation(taskAmount = 0.2, + expectedAssignedAddress = Array("2"), + expectedAssignedAmount = Array(0.2), + expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.6, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 0.80002, "1" -> 1.0, "2" -> 0.4, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 1 gpus, ExecutorResourcesAmounts should assign "0" + testAllocation(taskAmount = 1.0, + expectedAssignedAddress = Array("0"), + expectedAssignedAmount = Array(1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 2 gpus, ExecutorResourcesAmounts should assign "0", "1" + testAllocation(taskAmount = 2.0, + expectedAssignedAddress = Array("0", "1"), + expectedAssignedAmount = Array(1.0, 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 1.0, "3" -> 1.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + + // request 4 gpus, ExecutorResourcesAmounts should assign "0", "1", "2", "3" + testAllocation(taskAmount = 4.0, + expectedAssignedAddress = Array("0", "1", "2", "3"), + expectedAssignedAmount = Array(1.0, 1.0, 1.0, 1.0), + expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 0.0, "3" -> 0.0)) + + testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0), + expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + } + + test("Don't allow acquire resource or address that is not available") { + // Init Executor Resource. + val totalRes = Map("gpu" -> Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + // Acquire an address from a resource that doesn't exist + val e = intercept[SparkException] { + availableExecResAmounts.acquire(toInternalResourceMap(Map("fpga" -> Map("1" -> 1.0)))) + } + assert(e.getMessage.contains("Try to acquire an address from fpga that doesn't exist")) + + // Acquire an address that is not available + val e1 = intercept[SparkException] { + availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("6" -> 1.0)))) + } + assert(e1.getMessage.contains("Try to acquire an address that doesn't exist")) + } + + test("Don't allow release resource or address that is not available") { + // Init Executor Resource. + val totalRes = Map("gpu" -> Map("0" -> 0.5, "1" -> 0.5, "2" -> 0.5, "3" -> 0.5)) + val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes) + + // Acquire an address from a resource that doesn't exist + val e = intercept[SparkException] { + availableExecResAmounts.release(toInternalResourceMap(Map("fpga" -> Map("1" -> 0.1)))) + } + assert(e.getMessage.contains("Try to release an address from fpga that doesn't exist")) + + // Acquire an address that is not available + val e1 = intercept[SparkException] { + availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("6" -> 0.1)))) + } + assert(e1.getMessage.contains("Try to release an address that is not assigned")) + } + +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index b36363d0f4cd..5aaacd2ec1c3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -24,7 +24,7 @@ import java.util.Properties import scala.collection.mutable.HashMap import org.apache.spark.{JobArtifactSet, SparkFunSuite} -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceAmountUtils import org.apache.spark.resource.ResourceUtils.GPU class TaskDescriptionSuite extends SparkFunSuite { @@ -59,8 +59,10 @@ class TaskDescriptionSuite extends SparkFunSuite { } } - val originalResources = - Map(GPU -> new ResourceInformation(GPU, Array("1", "2", "3"))) + val originalResources = Map(GPU -> + Map("1" -> ResourceAmountUtils.toInternalResource(0.2), + "2" -> ResourceAmountUtils.toInternalResource(0.5), + "3" -> ResourceAmountUtils.toInternalResource(0.1))) // Create a dummy byte buffer for the task. val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) @@ -99,17 +101,8 @@ class TaskDescriptionSuite extends SparkFunSuite { assert(decodedTaskDescription.artifacts.equals(artifacts)) assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties)) assert(decodedTaskDescription.cpus.equals(originalTaskDescription.cpus)) - assert(equalResources(decodedTaskDescription.resources, originalTaskDescription.resources)) + assert(decodedTaskDescription.resources === originalTaskDescription.resources) assert(decodedTaskDescription.serializedTask.equals(taskBuffer)) - - def equalResources(original: Map[String, ResourceInformation], - target: Map[String, ResourceInformation]): Boolean = { - original.size == target.size && original.forall { case (name, info) => - target.get(name).exists { targetInfo => - info.name.equals(targetInfo.name) && - info.addresses.sameElements(targetInfo.addresses) - } - } - } } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 2ab7df0d9cfd..72d0354c5577 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -22,8 +22,10 @@ import java.util.Properties import java.util.concurrent.{CountDownLatch, ExecutorService, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ +import scala.language.implicitConversions import scala.language.reflectiveCalls import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq} @@ -33,7 +35,8 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceAmountUtils, ResourceProfile, TaskResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.status.api.v1.ThreadStackTrace @@ -145,6 +148,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler } + // Convert resources to ExecutorResourcesAmounts automatically + implicit def toExecutorResourcesAmounts(resources: Map[String, mutable.Buffer[String]]): + ExecutorResourcesAmounts = { + // convert the old resources to ExecutorResourcesAmounts + new ExecutorResourcesAmounts(resources.map { case (rName, addresses) => + rName -> addresses.map(address => address -> ONE_ENTIRE_RESOURCE).toMap + }) + } + test("SPARK-32653: Decommissioned host/executor should be considered as inactive") { val scheduler = setupScheduler() val exec0 = "exec0" @@ -1738,7 +1750,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val singleCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, resources)) val zeroGpuWorkerOffers = - IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, Map.empty)) + IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None)) taskScheduler.submitTasks(taskSet) // WorkerOffer doesn't contain GPU resource, don't launch any task. var taskDescriptions = taskScheduler.resourceOffers(zeroGpuWorkerOffers).flatten @@ -1748,8 +1760,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten assert(2 === taskDescriptions.length) assert(!failedTaskSet) - assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses) - assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.addresses) + assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.keys.toArray.sorted) + assert(ArrayBuffer("1") === taskDescriptions(1).resources.get(GPU).get.keys.toArray.sorted) } test("Scheduler correctly accounts for GPUs per task with fractional amount") { @@ -1775,9 +1787,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten assert(3 === taskDescriptions.length) assert(!failedTaskSet) - assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses) - assert(ArrayBuffer("0") === taskDescriptions(1).resources.get(GPU).get.addresses) - assert(ArrayBuffer("0") === taskDescriptions(2).resources.get(GPU).get.addresses) + assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.keys.toArray.sorted) + assert(ArrayBuffer("0") === taskDescriptions(1).resources.get(GPU).get.keys.toArray.sorted) + assert(ArrayBuffer("0") === taskDescriptions(2).resources.get(GPU).get.keys.toArray.sorted) } test("Scheduler works with multiple ResourceProfiles and gpus") { @@ -1815,10 +1827,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.length == 2) { + if (tDesc.resources(GPU).keys.size == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.length == 1) { + if (tDesc.resources(GPU).keys.size == 1) { has1Gpu += 1 } } @@ -1830,13 +1842,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext // clear the first 2 worker offers so they don't have any room and add a third // for the resource profile val workerOffers3 = IndexedSeq( - new WorkerOffer("executor0", "host0", 0, None, Map.empty), - new WorkerOffer("executor1", "host1", 0, None, Map.empty, rp.id), + new WorkerOffer("executor0", "host0", 0, None), + new WorkerOffer("executor1", "host1", 0, None, ExecutorResourcesAmounts.empty, rp.id), new WorkerOffer("executor2", "host2", 6, None, resources3, rp.id)) taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.length) + assert(2 == taskDescriptions.head.resources(GPU).keys.size) } test("Scheduler works with task resource profiles") { @@ -1875,10 +1887,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext var has1Gpu = 0 for (tDesc <- taskDescriptions) { assert(tDesc.resources.contains(GPU)) - if (tDesc.resources(GPU).addresses.length == 2) { + if (tDesc.resources(GPU).keys.size == 2) { has2Gpus += 1 } - if (tDesc.resources(GPU).addresses.length == 1) { + if (tDesc.resources(GPU).keys.size == 1) { has1Gpu += 1 } } @@ -1890,13 +1902,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext // clear the first 2 worker offers so they don't have any room and add a third // for the resource profile val workerOffers3 = IndexedSeq( - WorkerOffer("executor0", "host0", 0, None, Map.empty), - WorkerOffer("executor1", "host1", 0, None, Map.empty), + WorkerOffer("executor0", "host0", 0, None), + WorkerOffer("executor1", "host1", 0, None), WorkerOffer("executor2", "host2", 4, None, resources3)) taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten assert(2 === taskDescriptions.length) assert(taskDescriptions.head.resources.contains(GPU)) - assert(2 == taskDescriptions.head.resources(GPU).addresses.length) + assert(2 == taskDescriptions.head.resources(GPU).keys.size) } test("Calculate available tasks slots for task resource profiles") { @@ -1922,11 +1934,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext val workerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0), WorkerOffer("executor1", "host1", 4, None, resources1)) - val availableResourcesAmount = workerOffers.map(_.resources).map { resourceMap => - // available addresses already takes into account if there are fractional - // task resource requests - resourceMap.map { case (name, addresses) => (name, addresses.length) } - } + val availableResourcesAmount = workerOffers.map(_.resources).map { resAmounts => + // available addresses already takes into account if there are fractional + // task resource requests + resAmounts.resourceAddressAmount + } val taskSlotsForRp = TaskSchedulerImpl.calculateAvailableSlots( taskScheduler, taskScheduler.conf, rp.id, workerOffers.map(_.resourceProfileId).toArray, @@ -2283,4 +2295,425 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext taskScheduler.handleFailedTask(tsm, tid, state, reason) } + private implicit def toInternalResource(resources: Map[String, Double]): Map[String, Long] = + resources.map { case (k, v) => k -> ResourceAmountUtils.toInternalResource(v) } + + // 1 executor with 4 GPUS + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run in the same executor") { + val taskCpus = 1 + val executorCpus = 100 // cpu will not limit the concurrent tasks number + val executorGpus = 1 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> gpuTaskAmount.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum) + } + + val resources = new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)))) + + val workerOffers = + IndexedSeq(WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources)) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements. + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + var gpuAddress = -1 + for (taskId <- 0 until 4 * taskNum) { + if (taskId % taskNum == 0) { + gpuAddress += 1 + } + assert(ArrayBuffer(gpuAddress.toString) === + taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted) + } + } + } + } + + // 4 executors, each of which has 1 GPU + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run on the different executor") { + val taskCpus = 1 + val executorCpus = 100 // cpu will not limit the concurrent tasks number + val executorGpus = 1 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> gpuTaskAmount.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum) + } + + val workerOffers = + IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host0"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("0" -> 1.0))))), + WorkerOffer("executor1", "host1", executorCpus, Some("host1"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("1" -> 1.0))))), + WorkerOffer("executor2", "host2", executorCpus, Some("host2"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("2" -> 1.0))))), + WorkerOffer("executor3", "host3", executorCpus, Some("host3"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("3" -> 1.0)))))) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements + + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + val assignedGpus: HashMap[String, Int] = HashMap.empty + for (taskId <- 0 until 4 * taskNum) { + val gpus = taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted + assert(gpus.length == 1) + val addr = gpus(0) + if (!assignedGpus.contains(addr)) { + assignedGpus(addr) = 1 + } else { + assignedGpus(addr) += 1 + } + } + assert(assignedGpus.toMap === + Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum)) + } + } + } + + // 1 executor with 4 GPUS + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 TaskResourceProfile with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run in the same executor") { + val executorCpus = 100 // cpu will not limit the concurrent tasks number + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> "1", + TASK_GPU_ID.amountConf -> "0.1", + EXECUTOR_GPU_ID.amountConf -> "4", + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, gpuTaskAmount) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) + } + val resources = new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0)))) + + val workerOffers = IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host0"), resources, rp.id) + ) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements. + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + var gpuAddress = -1 + for (taskId <- 0 until 4 * taskNum) { + if (taskId % taskNum == 0) { + gpuAddress += 1 + } + assert(ArrayBuffer(gpuAddress.toString) === + taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted) + } + } + } + } + + // 4 executors, each of which has 1 GPU + Seq(true, false).foreach { barrierMode => + val barrier = if (barrierMode) "barrier" else "" + (1 to 20).foreach { taskNum => + val gpuTaskAmount = ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum) + test(s"SPARK-45527 TaskResourceProfile with task.gpu.amount=${gpuTaskAmount} can " + + s"restrict $taskNum $barrier tasks run on the different executor") { + val executorCpus = 100 // cpu will not limit the concurrent tasks number + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> "1", + TASK_GPU_ID.amountConf -> "0.1", + EXECUTOR_GPU_ID.amountConf -> "1", + config.EXECUTOR_CORES.key -> executorCpus.toString) + + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, gpuTaskAmount) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val taskSet = if (barrierMode) { + FakeTask.createTaskSet(100, 0, 1, 1, rp.id) + } else { + FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id) + } + + val workerOffers = + IndexedSeq( + WorkerOffer("executor0", "host0", executorCpus, Some("host1"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("0" -> 1.0)))), + rp.id), + WorkerOffer("executor1", "host1", executorCpus, Some("host2"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("1" -> 1.0)))), + rp.id), + WorkerOffer("executor2", "host2", executorCpus, Some("host3"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("2" -> 1.0)))), + rp.id), + WorkerOffer("executor3", "host3", executorCpus, Some("host4"), + new ExecutorResourcesAmounts(Map(GPU -> toInternalResource(Map("3" -> 1.0)))), + rp.id) + ) + + taskScheduler.submitTasks(taskSet) + // Launch tasks on executor that satisfies resource requirements + + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 * taskNum === taskDescriptions.length) + assert(!failedTaskSet) + val assignedGpus: HashMap[String, Int] = HashMap.empty + for (taskId <- 0 until 4 * taskNum) { + val gpus = taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted + assert(gpus.length == 1) + val addr = gpus(0) + if (!assignedGpus.contains(addr)) { + assignedGpus(addr) = 1 + } else { + assignedGpus(addr) += 1 + } + } + assert(assignedGpus.toMap === + Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum)) + } + } + } + + test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 executor " + + "can assign to other taskset") { + val taskCpus = 1 + val taskGpus = 0.3 + val executorGpus = 4 + val executorCpus = 1000 + + // each tasks require 0.3 gpu + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + // each task require 0.7 gpu + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val workerOffers = + IndexedSeq( + // cpu won't be a problem + WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))) + ) + + taskScheduler.submitTasks(lowerTaskSet) + taskScheduler.submitTasks(higherRpTaskSet) + + // should have 3 for default profile and 2 for additional resource profile + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(8 === taskDescriptions.length) + var index = 0 + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + assert(addresses.length == 1) + if (index < 4) { // the first 4 tasks will grab 0.7 gpu + assert(addresses(0) == index.toString) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(index.toString)) == 0.7) + } else { + assert(addresses(0) == (index - 4).toString) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3) + } + index += 1 + } + } + + test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple executors " + + "can assign to other taskset") { + val taskCpus = 1 + val taskGpus = 0.3 + val executorGpus = 4 + val executorCpus = 1000 + + // each tasks require 0.3 gpu + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + // each task require 0.7 gpu + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val workerOffers = + IndexedSeq( + // cpu won't be a problem + WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0))))), + WorkerOffer("executor1", "host1", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("1" -> 1.0))))), + WorkerOffer("executor2", "host2", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("2" -> 1.0))))), + WorkerOffer("executor3", "host3", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("3" -> 1.0))))) + ) + + taskScheduler.submitTasks(lowerTaskSet) + taskScheduler.submitTasks(higherRpTaskSet) + + // should have 3 for default profile and 2 for additional resource profile + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(8 === taskDescriptions.length) + + var index = 0 + val higherAssignedExecutorsGpus = ArrayBuffer[(String, String)]() + val lowerAssignedExecutorsGpus = ArrayBuffer[(String, String)]() + + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + assert(addresses.length == 1) + val address = addresses(0) + + // Executor 0, executor 1, executor 2, executor 3 + // task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 task_0.7, task_03 + if (index % 2 == 0) { + higherAssignedExecutorsGpus.append( + (tDesc.executorId, address)) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(address)) == 0.7) + } else { + lowerAssignedExecutorsGpus.append( + (tDesc.executorId, address)) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(address)) == 0.3) + } + index += 1 + } + + assert(higherAssignedExecutorsGpus.sorted sameElements + ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3") + )) + assert(lowerAssignedExecutorsGpus.sorted sameElements + ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), ("executor3", "3") + )) + } + + test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 executor " + + "can't assign to other taskset due to not enough gpu resource") { + val taskCpus = 1 + val taskGpus = 0.4 + val executorGpus = 4 + val executorCpus = 4 + + // each tasks require 0.3 gpu + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1, + ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) + + // each task require 0.7 gpu + val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val workerOffers = + IndexedSeq( + // cpu won't be a problem + WorkerOffer("executor0", "host0", 1000, None, new ExecutorResourcesAmounts( + Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))))) + ) + + taskScheduler.submitTasks(lowerTaskSet) + taskScheduler.submitTasks(higherRpTaskSet) + + // only offer the resources to the higher taskset + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(4 === taskDescriptions.length) + var index = 0 + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted + assert(addresses.length == 1) + assert(addresses(0) == index.toString) + assert(ResourceAmountUtils.toFractionalResource( + tDesc.resources.get(GPU).get(index.toString)) == 0.7) + index += 1 + } + } + + test("SPARK-45527 schedule tasks for a barrier taskSet if all tasks can be launched together") { + val taskCpus = 2 + val taskScheduler = setupSchedulerWithMaster( + s"local[$taskCpus]", + config.CPUS_PER_TASK.key -> taskCpus.toString) + + val numFreeCores = 3 + val workerOffers = IndexedSeq( + new WorkerOffer("executor0", "host0", numFreeCores, Some("192.168.0.101:49625")), + new WorkerOffer("executor1", "host1", numFreeCores, Some("192.168.0.101:49627")), + new WorkerOffer("executor2", "host2", numFreeCores, Some("192.168.0.101:49629"))) + val attempt1 = FakeTask.createBarrierTaskSet(3) + + // submit attempt 1, offer some resources, all tasks get launched together + taskScheduler.submitTasks(attempt1) + val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(3 === taskDescriptions.length) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2f8b6df8beac..26b38bfcc9ab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -38,7 +38,8 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.{SKIP_VALIDATE_CORES_TESTING, TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED} -import org.apache.spark.resource.{ResourceInformation, ResourceProfile} +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE +import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -1825,7 +1826,8 @@ class TaskSetManagerSuite val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))) + val taskResourceAssignments = Map( + GPU -> Map("0" -> ONE_ENTIRE_RESOURCE, "1" -> ONE_ENTIRE_RESOURCE)) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF, 2, taskResourceAssignments)._1 assert(taskOption.isDefined) @@ -1833,7 +1835,9 @@ class TaskSetManagerSuite val allocatedResources = taskOption.get.resources assert(allocatedCpus == 2) assert(allocatedResources.size == 1) - assert(allocatedResources(GPU).addresses sameElements Array("0", "1")) + assert(allocatedResources(GPU).keys.toArray.sorted sameElements Array("0", "1")) + assert(allocatedResources === taskResourceAssignments) + } test("SPARK-26755 Ensure that a speculative task is submitted only once for execution") {