Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ee71e46
Use fraction to do the resource calculation
wbo4958 Oct 15, 2023
2d68843
comments
wbo4958 Oct 27, 2023
fbd647d
comments
wbo4958 Oct 30, 2023
58899b2
comments
wbo4958 Nov 2, 2023
cd1c0ef
keep Long internal
wbo4958 Nov 2, 2023
51ac5cb
remove duplicated resources
wbo4958 Nov 6, 2023
d02a3be
fix CI
wbo4958 Nov 6, 2023
07c42b3
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 13, 2023
2127a7b
validate the task.amount in the ResourceProfile and TaskResourceProfile
wbo4958 Nov 13, 2023
61bcb34
add tests
wbo4958 Nov 13, 2023
eb7f918
add warnOnWastedResources for TaskResourceProfile
wbo4958 Nov 14, 2023
18084e6
change the rp corresponding tests
wbo4958 Nov 14, 2023
e9e7a26
Comments
wbo4958 Nov 16, 2023
877cacf
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 16, 2023
8657837
comments
wbo4958 Nov 20, 2023
6ca3567
comments
wbo4958 Nov 22, 2023
1e590bc
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 22, 2023
0be3176
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 27, 2023
3b08d1e
toIndexedSeq
wbo4958 Nov 27, 2023
c772b33
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 4, 2023
c2da6e3
comments
wbo4958 Dec 4, 2023
196e11b
comment
wbo4958 Dec 4, 2023
7c11b6c
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 4, 2023
347196f
resolve comments
wbo4958 Dec 20, 2023
ab4c48e
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,21 @@ private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = 1

/**
* Acquire the resources.
* @param amount How many addresses are requesting.
* @return ResourceInformation
*/
def acquire(amount: Int): ResourceInformation = {
val allocated = availableAddrs.take(amount)
acquire(allocated)
new ResourceInformation(resourceName, allocated.toArray)

// Any available address from availableAddrs must be a whole resource
// since worker needs to do full resources to the executors.
val addresses = availableAddrs.take(amount)
assert(addresses.length == amount)

acquire(addresses.map(addr => addr -> 1.0).toMap)
new ResourceInformation(resourceName, addresses.toArray)
}
}

Expand Down Expand Up @@ -162,7 +171,7 @@ private[spark] class WorkerInfo(
*/
def recoverResources(expected: Map[String, ResourceInformation]): Unit = {
expected.foreach { case (rName, rInfo) =>
resources(rName).acquire(rInfo.addresses)
resources(rName).acquire(rInfo.addresses.map(addr => addr -> 1.0).toMap)
}
}

Expand All @@ -172,7 +181,7 @@ private[spark] class WorkerInfo(
*/
private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = {
allocated.foreach { case (rName, rInfo) =>
resources(rName).release(rInfo.addresses)
resources(rName).release(rInfo.addresses.map(addrs => addrs -> 1.0).toMap)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ private[spark] class CoarseGrainedExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think resources here is essentially unused now, which means taskResources is likely not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, you're right. I'd like to keep the minimum change in this PR, could we clean them up in the followup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. I will fix them in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
val resourcesAmounts = executor.runningTasks.get(taskId).taskDescription.resourcesAmounts
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources, resourcesAmounts)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
}
Expand Down
112 changes: 74 additions & 38 deletions core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,42 @@ package org.apache.spark.resource
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.resource.ResourceAmountUtils.RESOURCE_TOTAL_AMOUNT

private[spark] object ResourceAmountUtils {
/**
* Using "double" to do the resource calculation may encounter a problem of precision loss. Eg
*
* scala> val taskAmount = 1.0 / 9
* taskAmount: Double = 0.1111111111111111
*
* scala> var total = 1.0
* total: Double = 1.0
*
* scala> for (i <- 1 to 9 ) {
* | if (total >= taskAmount) {
* | total -= taskAmount
* | println(s"assign $taskAmount for task $i, total left: $total")
* | } else {
* | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total")
* | }
* | }
* assign 0.1111111111111111 for task 1, total left: 0.8888888888888888
* assign 0.1111111111111111 for task 2, total left: 0.7777777777777777
* assign 0.1111111111111111 for task 3, total left: 0.6666666666666665
* assign 0.1111111111111111 for task 4, total left: 0.5555555555555554
* assign 0.1111111111111111 for task 5, total left: 0.44444444444444425
* assign 0.1111111111111111 for task 6, total left: 0.33333333333333315
* assign 0.1111111111111111 for task 7, total left: 0.22222222222222204
* assign 0.1111111111111111 for task 8, total left: 0.11111111111111094
* ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094
*
* So we multiply RESOURCE_TOTAL_AMOUNT to convert the double to long to avoid this limitation.
* Double can display up to 16 decimal places, so we set the factor to
* 10, 000, 000, 000, 000, 000L.
*/
final val RESOURCE_TOTAL_AMOUNT: Long = 10000000000000000L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename this ONE_ENTIRE_RESOURCE or something that indicates this is the entire amount of a single resource..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good suggestion. Done

}

/**
* Trait used to help executor/worker allocate resources.
Expand All @@ -29,59 +65,54 @@ private[spark] trait ResourceAllocator {

protected def resourceName: String
protected def resourceAddresses: Seq[String]
protected def slotsPerAddress: Int

/**
* Map from an address to its availability, a value > 0 means the address is available,
* while value of 0 means the address is fully assigned.
*
* For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value
* can be a multiple, such that each address can be allocated up to [[slotsPerAddress]]
* times.
* Map from an address to its availability default to RESOURCE_TOTAL_AMOUNT, a value > 0 means
* the address is available, while value of 0 means the address is fully assigned.
*/
private lazy val addressAvailabilityMap = {
mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*)
mutable.HashMap(resourceAddresses.map(address => address -> RESOURCE_TOTAL_AMOUNT): _*)
}

/**
* Sequence of currently available resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like
* Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one.
* Get the resources and its amounts.
* @return the resources amounts
*/
def resourcesAmounts: Map[String, Double] = addressAvailabilityMap.map {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave these in the Long form, I think only place this is used is in ExecutorResourcesAmount which could store the same way. I think this is a global comment, if we can store it in Long format and pass that everywhere and skip converting I'd rather do that. Only convert back to double to display to user and possibly logs.

I guess we do need to be careful to make sure that these are 1.0 or less though, if we start getting into the requests where user could ask for 250000 resources then we could hit overflow issues, so if we are passing those requests around might need to keep them in double format. Hopefully those are limited to the requests in the resource profiles though and we pass around the GPU index -> amount which should be 1.0 or less.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Leave in the Long should be more effective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Tom, for "if we start getting into the requests where user could ask for 250000 resources then we could hit overflow issues"

I couldn't understand why hitting the overflow issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just saying if we are storing things in long format (10000000000000000L * number of resources requested). Be sure that isn't going to overflow. Since I said its a generic comment all over, just make sure it isn't going to happen. If the values we store are always < (10000000000000000L * 1) its not a problem.

Overflow means you have a value larger then 2^64, which then isn't positive anymore

scala> 10000000000000000L * 2500l
res7: Long = 6553255926290448384

scala> 10000000000000000L * 25000l
res8: Long = -8254417031933722624

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for the explanation, Yeah, the values we store are always < 10000000000000000L * 1), so overflow is not going to happen.

case (address, internalAmount) =>
address -> (internalAmount.toDouble / RESOURCE_TOTAL_AMOUNT)
}.toMap

/**
* Sequence of currently available resource addresses which are not fully assigned.
*/
def availableAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 > 0).keys.toSeq.sorted

/**
* Sequence of currently assigned resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like
* Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice.
*/
private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until slotsPerAddress - available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 < RESOURCE_TOTAL_AMOUNT).keys.toSeq.sorted

/**
* Acquire a sequence of resource addresses (to a launched task), these addresses must be
* available. When the task finishes, it will return the acquired resource addresses.
* Throw an Exception if an address is not available or doesn't exist.
*/
def acquire(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def acquire(addressesAmounts: Map[String, Double]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable > 0) {
addressAvailabilityMap(address) -= 1
s"address $address doesn't exist."))

val internalLeft = addressAvailabilityMap(address) - (amount * RESOURCE_TOTAL_AMOUNT).toLong

if (internalLeft < 0) {
throw new SparkException(s"Try to acquire $resourceName address $address " +
s"amount: $amount, but only ${prevAmount.toDouble / RESOURCE_TOTAL_AMOUNT} left.")
} else {
throw new SparkException("Try to acquire an address that is not available. " +
s"$resourceName address $address is not available.")
addressAvailabilityMap(address) = internalLeft
}
}
}
Expand All @@ -91,16 +122,21 @@ private[spark] trait ResourceAllocator {
* addresses are released when a task has finished.
* Throw an Exception if an address is not assigned or doesn't exist.
*/
def release(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def release (addressesAmounts: Map[String, Double]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable < slotsPerAddress) {
addressAvailabilityMap(address) += 1

val internalTotal = prevAmount + (amount * RESOURCE_TOTAL_AMOUNT).toLong

if (internalTotal > RESOURCE_TOTAL_AMOUNT) {
throw new SparkException(s"Try to release $resourceName address $address " +
s"amount: $amount. But the total amount: " +
s"${internalTotal.toDouble / RESOURCE_TOTAL_AMOUNT} " +
s"after release should be <= 1")
} else {
throw new SparkException(s"Try to release an address that is not assigned. $resourceName " +
s"address $address is not assigned.")
addressAvailabilityMap(address) = internalTotal
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,16 @@ private[spark] object ResourceUtils extends Logging {
// integer amount and the number of slots per address. For instance, if the amount is 0.5,
// the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per
// address, which allows you to put 2 tasks on that address. Note if amount is greater
// than 1, then the number of slots per address has to be 1. This would indicate that a
// than 1, then the number of parts per address has to be 1. This would indicate that a
// would have multiple addresses assigned per task. This can be used for calculating
// the number of tasks per executor -> (executorAmount * numParts) / (integer amount).
// Returns tuple of (integer amount, numParts)
def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = {
val parts = if (doubleAmount <= 0.5) {
val parts = if (doubleAmount <= 1.0) {
Math.floor(1.0 / doubleAmount).toInt
} else if (doubleAmount % 1 != 0) {
throw new SparkException(
s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.")
s"The resource amount ${doubleAmount} must be either <= 1.0, or a whole number.")
} else {
1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since}
*
* @param resourceName Resource name
* @param amount Amount requesting as a Double to support fractional resource requests.
* Valid values are less than or equal to 0.5 or whole numbers. This essentially
* Valid values are less than or equal to 1.0 or whole numbers. This essentially
* lets you configure X number of tasks to run on a single resource,
* ie amount equals 0.5 translates into 2 tasks per resource address.
*/
Expand All @@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since}
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

assert(amount <= 0.5 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
assert(amount <= 1.0 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 1.0, or a whole number.")

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,28 @@ import org.apache.spark.resource.{ResourceAllocator, ResourceInformation}
* information.
* @param name Resource name
* @param addresses Resource addresses provided by the executor
* @param numParts Number of ways each resource is subdivided when scheduling tasks
*/
private[spark] class ExecutorResourceInfo(
name: String,
addresses: Seq[String],
numParts: Int)
addresses: Seq[String])
extends ResourceInformation(name, addresses.toArray) with ResourceAllocator {

override protected def resourceName = this.name

override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = numParts
def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress

/**
* Calculate how many parts the executor can offer according to the task resource amount
* @param taskAmount how many resource amount the task required
* @return the total parts
*/
def totalParts(taskAmount: Double): Int = {
assert(taskAmount > 0.0)
if (taskAmount >= 1.0) {
addresses.length / taskAmount.ceil.toInt
} else {
addresses.length * Math.floor(1.0 / taskAmount).toInt
}
}

}
Loading