Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import scala.collection.mutable

import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement}
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

Expand All @@ -28,12 +29,20 @@ private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = 1

/**
* Acquire the resources.
* @param amount How many addresses are requesting.
* @return ResourceInformation
*/
def acquire(amount: Int): ResourceInformation = {
val allocated = availableAddrs.take(amount)
acquire(allocated)
new ResourceInformation(resourceName, allocated.toArray)
// Any available address from availableAddrs must be a whole resource
// since worker needs to do full resources to the executors.
val addresses = availableAddrs.take(amount)
assert(addresses.length == amount)

acquire(addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap)
new ResourceInformation(resourceName, addresses.toArray)
}
}

Expand Down Expand Up @@ -162,7 +171,7 @@ private[spark] class WorkerInfo(
*/
def recoverResources(expected: Map[String, ResourceInformation]): Unit = {
expected.foreach { case (rName, rInfo) =>
resources(rName).acquire(rInfo.addresses)
resources(rName).acquire(rInfo.addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap)
}
}

Expand All @@ -172,7 +181,7 @@ private[spark] class WorkerInfo(
*/
private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = {
allocated.foreach { case (rName, rInfo) =>
resources(rName).release(rInfo.addresses)
resources(rName).release(rInfo.addresses.map(addrs => addrs -> ONE_ENTIRE_RESOURCE).toMap)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.{Failure, Success}
Expand Down Expand Up @@ -68,16 +67,6 @@ private[spark] class CoarseGrainedExecutorBackend(

private var _resources = Map.empty[String, ResourceInformation]

/**
* Map each taskId to the information about the resource allocated to it, Please refer to
* [[ResourceInformation]] for specifics.
* CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
* Exposed for testing only.
*/
private[executor] val taskResources = new ConcurrentHashMap[
Long, Map[String, ResourceInformation]
]

private var decommissioned = false

// Track the last time in ns that at least one task is running. If no task is running and all
Expand Down Expand Up @@ -191,7 +180,6 @@ private[spark] class CoarseGrainedExecutorBackend(
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources.put(taskDesc.taskId, taskDesc.resources)
executor.launchTask(this, taskDesc)
}

Expand Down Expand Up @@ -268,11 +256,10 @@ private[spark] class CoarseGrainedExecutorBackend(
}

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
val resources = executor.runningTasks.get(taskId).taskDescription.resources
val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
lastTaskFinishTime.set(System.nanoTime())
}
driver match {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,17 @@ private[spark] class Executor(
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
// Convert resources amounts info to ResourceInformation
val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
}
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
cpus = taskDescription.cpus,
resources = taskDescription.resources,
resources = resources,
plugins = plugins)
threwException = false
res
Expand Down
118 changes: 80 additions & 38 deletions core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,49 @@ package org.apache.spark.resource
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE

private[spark] object ResourceAmountUtils {
/**
* Using "double" to do the resource calculation may encounter a problem of precision loss. Eg
*
* scala> val taskAmount = 1.0 / 9
* taskAmount: Double = 0.1111111111111111
*
* scala> var total = 1.0
* total: Double = 1.0
*
* scala> for (i <- 1 to 9 ) {
* | if (total >= taskAmount) {
* | total -= taskAmount
* | println(s"assign $taskAmount for task $i, total left: $total")
* | } else {
* | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total")
* | }
* | }
* assign 0.1111111111111111 for task 1, total left: 0.8888888888888888
* assign 0.1111111111111111 for task 2, total left: 0.7777777777777777
* assign 0.1111111111111111 for task 3, total left: 0.6666666666666665
* assign 0.1111111111111111 for task 4, total left: 0.5555555555555554
* assign 0.1111111111111111 for task 5, total left: 0.44444444444444425
* assign 0.1111111111111111 for task 6, total left: 0.33333333333333315
* assign 0.1111111111111111 for task 7, total left: 0.22222222222222204
* assign 0.1111111111111111 for task 8, total left: 0.11111111111111094
* ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094
*
* So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation.
* Double can display up to 16 decimal places, so we set the factor to
* 10, 000, 000, 000, 000, 000L.
*/
final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too hacky.
I get it, but, the right solution in your example is to ask for something like 0.111 GPUs to get 9. This works fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srowen,

Thank you for your review. You've suggested a straightforward solution to address the issue of double precision by actively sacrificing some accuracy, and it's certainly doable. However, this raises a question about the precision that Spark should define. For instance, if a user sets the value to 0.1111111111111, which value should Spark use for the calculation: 0.111, 0.1111, or something else? If we simply select 4 decimal places, what happens if a user wants to set the value to 0.000001? In that case, the value would be converted to 0 and might raise an exception. But It seems that a configuration option could resolve this problem.

My initial thought was that the resource fraction calculation should work regardless of the value set by the user, and the proposed approach in the pull request to convert the double to a Long can achieve this goal.

Nevertheless, if you and other committers believe that a change is necessary, I am open to submitting a pull request for the master branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it also doesn't feel wholly satisfying. In this example all of those values work as 9 times even 0.11 leaves you with less than 0.11 remaining, so you schedule 9. It would also imply there is 0.01 GPU left when that isn't the intent. In practice, I strongly doubt anyone is ever scheduling, let's say, more than 100 tasks on one GPU.

(But what about non-GPU resources? there aren't any now. Are there resources you'd schedule very very small fractions of? I can't think of any even in the future.)

Going down the rabbit hole of floating-point precision, I think we hit that no matter what. If I ask for 1e-16 resources, any way we interpret that probably is slightly imprecise as it's interpreted as float somewhere. But these are unrealistic use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen what if your major concern here? Are you seeing performance issues? just more complex then you think is necessary? i don't think its hacky, it might be overkill but I do agree its likely more then we need for precision. I would go more then 100 though as 100 tasks per some resource seems unlikely but not impossible, 1000 - 10000 much more unlikely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can solve floating-point accuracy here in the general case, and this will virtually never arise anyway, except in one important class of case -- n GPUs where n < 10 and n is relatively prime to 10. Like, 3 even. A person writing down the resource utilization will almost surely write "0.333", but one can imagine supplying str(1./3.) programmatically. And then this issue could arise. Some string like "0.333333333" may end up as a float that has a value just over 1/3.

The other issue is approaching it this way by just multiplying by a long. I guess instead this can be done properly with BigDecimal at least? and I don't think this should touch so many parts of the code. Surely this just affects how the resource request string is parsed and compared to available resources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srowen,

The using Long way is quite the same with the BigDecimal. The default scale of BigDecimal is 16, so this PR chooses (ONE_ENTIRE_RESOURCE = 1E16.toLong)

scala>     val ONE_ENTIRE_RESOURCE: Long = 1E16.toLong
     |     val taskAmount = 0.3333333333333334
     | 
     |     val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong
     | 
     |     val bigDec = BigDecimal(taskAmount).toDouble
val ONE_ENTIRE_RESOURCE: Long = 10000000000000000
val taskAmount: Double = 0.3333333333333334
val usingLong: Long = 3333333333333334
val bigDec: Double = 0.3333333333333334

So if we need to ensure the input is small enough (<1/n) and we can set the scale to be like 14 for BigDecimal, and similarly, to keep align with BigDecimal, we can set ONE_ENTIRE_RESOURCE = 1E14.toLong

scala>     import scala.math.BigDecimal.RoundingMode
     | 
     |     val ONE_ENTIRE_RESOURCE: Long = 1E14.toLong
     |     val taskAmount = 0.3333333333333334
     | 
     |     val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong
     | 
     |     val bigDec = BigDecimal(taskAmount).setScale(14, RoundingMode.DOWN).toDouble
import scala.math.BigDecimal.RoundingMode
val ONE_ENTIRE_RESOURCE: Long = 100000000000000
val taskAmount: Double = 0.3333333333333334
val usingLong: Long = 33333333333333
val bigDec: Double = 0.33333333333333

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The point of Big decimal is that you never have to use double or float. This isn't actually doing math with BD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srowen. Actually, this PR converts the taskAmount which is a double/float value to a Long by multiplying 1E16.toLong, and then the following calculation is based on Long instead of double/float. you can see, all the APIs of ExecutorResourcesAmount are using Long. Even the assigned resource of a task is still keeping the Long, you can refer to there.

Yeah, but you think we should use BigDecimal, I'm Okay for that, I can make a PR for master branch first, and then cherry-pick to 3.5 branch.

Copy link
Member

@srowen srowen Feb 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. Above I give an example where multiplying by long doesn't work. I'm referring to your example of BigDecimal above, which does not use (only) BigDecimal. Please just try it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @srowen. Sure, let me have a PR using BD for master branch. Thx


def isOneEntireResource(amount: Long): Boolean = amount == ONE_ENTIRE_RESOURCE

def toInternalResource(amount: Double): Long = (amount * ONE_ENTIRE_RESOURCE).toLong

def toFractionalResource(amount: Long): Double = amount.toDouble / ONE_ENTIRE_RESOURCE

}

/**
* Trait used to help executor/worker allocate resources.
Expand All @@ -29,59 +72,53 @@ private[spark] trait ResourceAllocator {

protected def resourceName: String
protected def resourceAddresses: Seq[String]
protected def slotsPerAddress: Int

/**
* Map from an address to its availability, a value > 0 means the address is available,
* while value of 0 means the address is fully assigned.
*
* For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value
* can be a multiple, such that each address can be allocated up to [[slotsPerAddress]]
* times.
* Map from an address to its availability default to 1.0 (we multiply ONE_ENTIRE_RESOURCE
* to avoid precision error), a value &gt; 0 means the address is available, while value of
* 0 means the address is fully assigned.
*/
private lazy val addressAvailabilityMap = {
mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*)
mutable.HashMap(resourceAddresses.map(address => address -> ONE_ENTIRE_RESOURCE): _*)
}

/**
* Sequence of currently available resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like
* Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one.
* Get the amounts of resources that have been multiplied by ONE_ENTIRE_RESOURCE.
* @return the resources amounts
*/
def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap

/**
* Sequence of currently available resource addresses which are not fully assigned.
*/
def availableAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 > 0).keys.toSeq.sorted

/**
* Sequence of currently assigned resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like
* Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice.
*/
private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until slotsPerAddress - available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 < ONE_ENTIRE_RESOURCE).keys.toSeq.sorted

/**
* Acquire a sequence of resource addresses (to a launched task), these addresses must be
* available. When the task finishes, it will return the acquired resource addresses.
* Throw an Exception if an address is not available or doesn't exist.
*/
def acquire(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def acquire(addressesAmounts: Map[String, Long]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable > 0) {
addressAvailabilityMap(address) -= 1
s"address $address doesn't exist."))

val left = prevAmount - amount

if (left < 0) {
throw new SparkException(s"Try to acquire $resourceName address $address " +
s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}, but only " +
s"${ResourceAmountUtils.toFractionalResource(prevAmount)} left.")
} else {
throw new SparkException("Try to acquire an address that is not available. " +
s"$resourceName address $address is not available.")
addressAvailabilityMap(address) = left
}
}
}
Expand All @@ -91,16 +128,21 @@ private[spark] trait ResourceAllocator {
* addresses are released when a task has finished.
* Throw an Exception if an address is not assigned or doesn't exist.
*/
def release(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def release(addressesAmounts: Map[String, Long]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable < slotsPerAddress) {
addressAvailabilityMap(address) += 1

val total = prevAmount + amount

if (total > ONE_ENTIRE_RESOURCE) {
throw new SparkException(s"Try to release $resourceName address $address " +
s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}. But the total amount: " +
s"${ResourceAmountUtils.toFractionalResource(total)} " +
s"after release should be <= 1")
} else {
throw new SparkException(s"Try to release an address that is not assigned. $resourceName " +
s"address $address is not assigned.")
addressAvailabilityMap(address) = total
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {

validate()

// _id is only a var for testing purposes
private var _id = ResourceProfile.getNextProfileId
// This is used for any resources that use fractional amounts, the key is the resource name
Expand All @@ -59,6 +61,19 @@ class ResourceProfile(
private var _maxTasksPerExecutor: Option[Int] = None
private var _coresLimitKnown: Boolean = false

/**
* Validate the ResourceProfile
*/
protected def validate(): Unit = {
// The task.amount in ResourceProfile falls within the range of 0 to 0.5,
// or it's a whole number
for ((_, taskReq) <- taskResources) {
val taskAmount = taskReq.amount
assert(taskAmount <= 0.5 || taskAmount % 1 == 0,
s"The task resource amount ${taskAmount} must be either <= 0.5, or a whole number.")
}
}

/**
* A unique id of this ResourceProfile
*/
Expand Down Expand Up @@ -105,12 +120,8 @@ class ResourceProfile(

/*
* This function takes into account fractional amounts for the task resource requirement.
* Spark only supports fractional amounts < 1 to basically allow for multiple tasks
* to use the same resource address.
* The way the scheduler handles this is it adds the same address the number of slots per
* address times and then the amount becomes 1. This way it re-uses the same address
* the correct number of times. ie task requirement amount=0.25 -> addrs["0", "0", "0", "0"]
* and scheduler task amount=1. See ResourceAllocator.slotsPerAddress.
* Spark only supports fractional amounts &lt; 1 to basically allow for multiple tasks
* to use the same resource address or a whole number to use the multiple whole addresses.
*/
private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = {
val taskAmount = taskResources.getOrElse(resource,
Expand Down Expand Up @@ -280,6 +291,11 @@ private[spark] class TaskResourceProfile(
override val taskResources: Map[String, TaskResourceRequest])
extends ResourceProfile(Map.empty, taskResources) {

// The task.amount in TaskResourceProfile falls within the range of 0 to 1.0,
// or it's a whole number, and it has been checked in the TaskResourceRequest.
// Therefore, we can safely skip this check.
override protected def validate(): Unit = {}

override protected[spark] def getCustomExecutorResources()
: Map[String, ExecutorResourceRequest] = {
if (SparkEnv.get == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,17 @@ private[spark] object ResourceUtils extends Logging {

// Used to take a fraction amount from a task resource requirement and split into a real
// integer amount and the number of slots per address. For instance, if the amount is 0.5,
// the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per
// address, which allows you to put 2 tasks on that address. Note if amount is greater
// than 1, then the number of slots per address has to be 1. This would indicate that a
// would have multiple addresses assigned per task. This can be used for calculating
// the number of tasks per executor -> (executorAmount * numParts) / (integer amount).
// the we get (1, 2) back out. This indicates that for each 1 address, it allows you to
// put 2 tasks on that address. Note if amount is greater than 1, then the number of
// running tasks per address has to be 1. This can be used for calculating
// the number of tasks per executor = (executorAmount * numParts) / (integer amount).
// Returns tuple of (integer amount, numParts)
def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = {
val parts = if (doubleAmount <= 0.5) {
val parts = if (doubleAmount <= 1.0) {
Math.floor(1.0 / doubleAmount).toInt
} else if (doubleAmount % 1 != 0) {
throw new SparkException(
s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.")
s"The resource amount ${doubleAmount} must be either <= 1.0, or a whole number.")
} else {
1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since}
*
* @param resourceName Resource name
* @param amount Amount requesting as a Double to support fractional resource requests.
* Valid values are less than or equal to 0.5 or whole numbers. This essentially
* Valid values are less than or equal to 1.0 or whole numbers. This essentially
* lets you configure X number of tasks to run on a single resource,
* ie amount equals 0.5 translates into 2 tasks per resource address.
*/
Expand All @@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since}
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

assert(amount <= 0.5 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
assert(amount <= 1.0 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 1.0, or a whole number.")

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
Loading