Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,15 @@ package org.apache.spark.resource
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE, ZERO_RESOURCE}

private[spark] object ResourceAmountUtils {
/**
* Using "double" to do the resource calculation may encounter a problem of precision loss. Eg
*
* scala> val taskAmount = 1.0 / 9
* taskAmount: Double = 0.1111111111111111
*
* scala> var total = 1.0
* total: Double = 1.0
*
* scala> for (i <- 1 to 9 ) {
* | if (total >= taskAmount) {
* | total -= taskAmount
* | println(s"assign $taskAmount for task $i, total left: $total")
* | } else {
* | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total")
* | }
* | }
* assign 0.1111111111111111 for task 1, total left: 0.8888888888888888
* assign 0.1111111111111111 for task 2, total left: 0.7777777777777777
* assign 0.1111111111111111 for task 3, total left: 0.6666666666666665
* assign 0.1111111111111111 for task 4, total left: 0.5555555555555554
* assign 0.1111111111111111 for task 5, total left: 0.44444444444444425
* assign 0.1111111111111111 for task 6, total left: 0.33333333333333315
* assign 0.1111111111111111 for task 7, total left: 0.22222222222222204
* assign 0.1111111111111111 for task 8, total left: 0.11111111111111094
* ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094
*
* So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation.
* Double can display up to 16 decimal places, so we set the factor to
* 10, 000, 000, 000, 000, 000L.
*/
final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L

def isOneEntireResource(amount: Long): Boolean = amount == ONE_ENTIRE_RESOURCE

def toInternalResource(amount: Double): Long = (amount * ONE_ENTIRE_RESOURCE).toLong
final val SCALE = 14

def toFractionalResource(amount: Long): Double = amount.toDouble / ONE_ENTIRE_RESOURCE
final val ONE_ENTIRE_RESOURCE: BigDecimal = BigDecimal(1.0).setScale(SCALE)

final val ZERO_RESOURCE: BigDecimal = BigDecimal(0).setScale(SCALE)
}

/**
Expand All @@ -86,13 +53,13 @@ private[spark] trait ResourceAllocator {
* Get the amounts of resources that have been multiplied by ONE_ENTIRE_RESOURCE.
* @return the resources amounts
*/
def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap
def resourcesAmounts: Map[String, BigDecimal] = addressAvailabilityMap.toMap

/**
* Sequence of currently available resource addresses which are not fully assigned.
*/
def availableAddrs: Seq[String] = addressAvailabilityMap
.filter(addresses => addresses._2 > 0).keys.toSeq.sorted
.filter(addresses => addresses._2 > ZERO_RESOURCE).keys.toSeq.sorted

/**
* Sequence of currently assigned resource addresses.
Expand All @@ -105,18 +72,17 @@ private[spark] trait ResourceAllocator {
* available. When the task finishes, it will return the acquired resource addresses.
* Throw an Exception if an address is not available or doesn't exist.
*/
def acquire(addressesAmounts: Map[String, Long]): Unit = {
def acquire(addressesAmounts: Map[String, BigDecimal]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
val prev = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))

val left = prevAmount - amount
val left = prev - amount

if (left < 0) {
if (left < ZERO_RESOURCE) {
throw new SparkException(s"Try to acquire $resourceName address $address " +
s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}, but only " +
s"${ResourceAmountUtils.toFractionalResource(prevAmount)} left.")
s"amount: ${amount}, but only ${prev} left.")
} else {
addressAvailabilityMap(address) = left
}
Expand All @@ -128,19 +94,17 @@ private[spark] trait ResourceAllocator {
* addresses are released when a task has finished.
* Throw an Exception if an address is not assigned or doesn't exist.
*/
def release(addressesAmounts: Map[String, Long]): Unit = {
def release(addressesAmounts: Map[String, BigDecimal]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
val prev = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))

val total = prevAmount + amount
val total = prev + amount

if (total > ONE_ENTIRE_RESOURCE) {
throw new SparkException(s"Try to release $resourceName address $address " +
s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}. But the total amount: " +
s"${ResourceAmountUtils.toFractionalResource(total)} " +
s"after release should be <= 1")
s"amount: ${amount}. But the total amount: ${total} after release should be <= 1")
} else {
addressAvailabilityMap(address) = total
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.scheduler

import scala.collection.mutable.HashMap
import scala.collection.mutable
import scala.math.BigDecimal.RoundingMode

import org.apache.spark.SparkException
import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfile}
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE, ZERO_RESOURCE}

/**
* Class to hold information about a series of resources belonging to an executor.
Expand All @@ -32,68 +33,56 @@ import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
* One example is GPUs, where the addresses would be the indices of the GPUs
*
* @param resources The executor available resources and amount. eg,
* Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.2),
* "1" -> ResourceAmountUtils.toInternalResource(1.0)),
* "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.3),
* "b" -> ResourceAmountUtils.toInternalResource(0.9))
* Map("gpu" -> Map("0" -> BigDecimal(0.2),
* "1" -> BigDecimal(1.0)),
* "fpga" -> Map("a" -> BigDecimal(0.3),
* "b" -> BigDecimal(0.9))
* )
*/
private[spark] class ExecutorResourcesAmounts(
private val resources: Map[String, Map[String, Long]]) extends Serializable {
private val resources: Map[String, Map[String, BigDecimal]]) extends Serializable {

/**
* convert the resources to be mutable HashMap
* The current available resources.
*/
private val internalResources: Map[String, HashMap[String, Long]] = {
private[spark] val availableResources: Map[String, mutable.HashMap[String, BigDecimal]] = {
resources.map { case (rName, addressAmounts) =>
rName -> HashMap(addressAmounts.toSeq: _*)
rName -> mutable.HashMap(addressAmounts.toSeq: _*)
}
}

/**
* The total address count of each resource. Eg,
* Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.5),
* "1" -> ResourceAmountUtils.toInternalResource(0.5),
* "2" -> ResourceAmountUtils.toInternalResource(0.5)),
* "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.5),
* "b" -> ResourceAmountUtils.toInternalResource(0.5)))
* The total address count of each resource. Eg, assume availableResources is
* Map("gpu" -> Map("0" -> BigDecimal(0.5),
* "1" -> BigDecimal(0.5),
* "2" -> BigDecimal(0.5)),
* "fpga" -> Map("a" -> BigDecimal(0.5),
* "b" -> BigDecimal(0.5))),
* the resourceAmount will be Map("gpu" -> 3, "fpga" -> 2)
*/
lazy val resourceAddressAmount: Map[String, Int] = internalResources.map {
private[spark] lazy val resourceAddressAmount: Map[String, Int] = availableResources.map {
case (rName, addressMap) => rName -> addressMap.size
}

/**
* For testing purpose. convert internal resources back to the "fraction" resources.
*/
private[spark] def availableResources: Map[String, Map[String, Double]] = {
internalResources.map { case (rName, addressMap) =>
rName -> addressMap.map { case (address, amount) =>
address -> ResourceAmountUtils.toFractionalResource(amount)
}.toMap
}
}

/**
* Acquire the resource.
* @param assignedResource the assigned resource information
*/
def acquire(assignedResource: Map[String, Map[String, Long]]): Unit = {
def acquire(assignedResource: Map[String, Map[String, BigDecimal]]): Unit = {
assignedResource.foreach { case (rName, taskResAmounts) =>
val availableResourceAmounts = internalResources.getOrElse(rName,
val availableResourceAmounts = availableResources.getOrElse(rName,
throw new SparkException(s"Try to acquire an address from $rName that doesn't exist"))
taskResAmounts.foreach { case (address, amount) =>
val prevInternalTotalAmount = availableResourceAmounts.getOrElse(address,
val prev = availableResourceAmounts.getOrElse(address,
throw new SparkException(s"Try to acquire an address that doesn't exist. $rName " +
s"address $address doesn't exist."))

val left = prevInternalTotalAmount - amount
if (left < 0) {
val left = prev - amount
if (left < ZERO_RESOURCE) {
throw new SparkException(s"The total amount " +
s"${ResourceAmountUtils.toFractionalResource(left)} " +
s"after acquiring $rName address $address should be >= 0")
s"${left.toDouble} after acquiring $rName address $address should be >= 0")
}
internalResources(rName)(address) = left
availableResources(rName)(address) = left
}
}
}
Expand All @@ -102,21 +91,20 @@ private[spark] class ExecutorResourcesAmounts(
* Release the assigned resources to the resource pool
* @param assignedResource resource to be released
*/
def release(assignedResource: Map[String, Map[String, Long]]): Unit = {
def release(assignedResource: Map[String, Map[String, BigDecimal]]): Unit = {
assignedResource.foreach { case (rName, taskResAmounts) =>
val availableResourceAmounts = internalResources.getOrElse(rName,
val availableResourceAmounts = availableResources.getOrElse(rName,
throw new SparkException(s"Try to release an address from $rName that doesn't exist"))
taskResAmounts.foreach { case (address, amount) =>
val prevInternalTotalAmount = availableResourceAmounts.getOrElse(address,
val prev = availableResourceAmounts.getOrElse(address,
throw new SparkException(s"Try to release an address that is not assigned. $rName " +
s"address $address is not assigned."))
val total = prevInternalTotalAmount + amount
val total = prev + amount
if (total > ONE_ENTIRE_RESOURCE) {
throw new SparkException(s"The total amount " +
s"${ResourceAmountUtils.toFractionalResource(total)} " +
s"after releasing $rName address $address should be <= 1.0")
s"${total.toDouble} after releasing $rName address $address should be <= 1.0")
}
internalResources(rName)(address) = total
availableResources(rName)(address) = total
}
}
}
Expand All @@ -141,50 +129,50 @@ private[spark] class ExecutorResourcesAmounts(
* of the task requests for resources aren't met.
*/
def assignAddressesCustomResources(taskSetProf: ResourceProfile):
Option[Map[String, Map[String, Long]]] = {
Option[Map[String, Map[String, BigDecimal]]] = {
// only look at the resource other than cpus
val tsResources = taskSetProf.getCustomTaskResources()
if (tsResources.isEmpty) {
return Some(Map.empty)
}

val allocatedAddresses = HashMap[String, Map[String, Long]]()
val allocatedAddresses = mutable.HashMap[String, Map[String, BigDecimal]]()

// Go through all resources here so that we can make sure they match and also get what the
// assignments are for the next task
for ((rName, taskReqs) <- tsResources) {
// TaskResourceRequest checks the task amount should be in (0, 1] or a whole number
var taskAmount = taskReqs.amount
// taskReqs.amount must be in (0, 1] or a whole number
var taskAmount = BigDecimal(taskReqs.amount)
.setScale(ResourceAmountUtils.SCALE, RoundingMode.DOWN)

internalResources.get(rName) match {
availableResources.get(rName) match {
case Some(addressesAmountMap) =>
val allocatedAddressesMap = HashMap[String, Long]()
val allocatedAddressesMap = mutable.HashMap[String, BigDecimal]()

// Always sort the addresses
val addresses = addressesAmountMap.keys.toSeq.sorted

// task.amount is a whole number
if (taskAmount >= 1.0) {
for (address <- addresses if taskAmount > 0) {
if (taskAmount >= ONE_ENTIRE_RESOURCE) {
for (address <- addresses if taskAmount > ZERO_RESOURCE) {
// The address is still a whole resource
if (ResourceAmountUtils.isOneEntireResource(addressesAmountMap(address))) {
taskAmount -= 1.0
if (addressesAmountMap(address) == ONE_ENTIRE_RESOURCE) {
taskAmount -= ONE_ENTIRE_RESOURCE
// Assign the full resource of the address
allocatedAddressesMap(address) = ONE_ENTIRE_RESOURCE
}
}
} else if (taskAmount > 0.0) { // 0 < task.amount < 1.0
val internalTaskAmount = ResourceAmountUtils.toInternalResource(taskAmount)
for (address <- addresses if taskAmount > 0) {
if (addressesAmountMap(address) >= internalTaskAmount) {
} else if (taskAmount > ZERO_RESOURCE) { // 0 < task.amount < 1.0
for (address <- addresses if taskAmount > ZERO_RESOURCE) {
if (addressesAmountMap(address) >= taskAmount) {
// Assign the part of the address.
allocatedAddressesMap(address) = internalTaskAmount
taskAmount = 0
allocatedAddressesMap(address) = taskAmount
taskAmount = ZERO_RESOURCE
}
}
}

if (taskAmount == 0 && allocatedAddressesMap.size > 0) {
if (taskAmount == ZERO_RESOURCE && allocatedAddressesMap.nonEmpty) {
allocatedAddresses.put(rName, allocatedAddressesMap.toMap)
} else {
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private[spark] class TaskDescription(
val properties: Properties,
val cpus: Int,
// resources is the total resources assigned to the task
// Eg, Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.7))):
// Eg, Map("gpu" -> Map("0" -> BigDecimal(0.7))):
// assign 0.7 of the gpu address "0" to this task
val resources: immutable.Map[String, immutable.Map[String, Long]],
val resources: immutable.Map[String, immutable.Map[String, BigDecimal]],
val serializedTask: ByteBuffer) {

assert(cpus > 0, "CPUs per task should be > 0")
Expand All @@ -76,15 +76,15 @@ private[spark] object TaskDescription {
}
}

private def serializeResources(map: immutable.Map[String, immutable.Map[String, Long]],
private def serializeResources(map: immutable.Map[String, immutable.Map[String, BigDecimal]],
dataOut: DataOutputStream): Unit = {
dataOut.writeInt(map.size)
map.foreach { case (rName, addressAmountMap) =>
dataOut.writeUTF(rName)
dataOut.writeInt(addressAmountMap.size)
addressAmountMap.foreach { case (address, amount) =>
dataOut.writeUTF(address)
dataOut.writeLong(amount)
dataOut.writeUTF(amount.toString())
}
}
}
Expand Down Expand Up @@ -176,19 +176,19 @@ private[spark] object TaskDescription {
}

private def deserializeResources(dataIn: DataInputStream):
immutable.Map[String, immutable.Map[String, Long]] = {
val map = new HashMap[String, immutable.Map[String, Long]]()
immutable.Map[String, immutable.Map[String, BigDecimal]] = {
val map = new HashMap[String, immutable.Map[String, BigDecimal]]()
val mapSize = dataIn.readInt()
var i = 0
while (i < mapSize) {
val resType = dataIn.readUTF()
val addressAmountMap = new HashMap[String, Long]()
val addressAmountMap = new HashMap[String, BigDecimal]()
val addressAmountSize = dataIn.readInt()
var j = 0
while (j < addressAmountSize) {
val address = dataIn.readUTF()
val amount = dataIn.readLong()
addressAmountMap(address) = amount
val amount = dataIn.readUTF()
addressAmountMap(address) = BigDecimal(amount)
j += 1
}
map.put(resType, addressAmountMap.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ private[spark] class TaskSchedulerImpl(
private def resourcesMeetTaskRequirements(
taskSet: TaskSetManager,
availCpus: Int,
availWorkerResources: ExecutorResourcesAmounts): Option[Map[String, Map[String, Long]]] = {
availWorkerResources: ExecutorResourcesAmounts
): Option[Map[String, Map[String, BigDecimal]]] = {
val rpId = taskSet.taskSet.resourceProfileId
val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, conf)
Expand Down
Loading