Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ee71e46
Use fraction to do the resource calculation
wbo4958 Oct 15, 2023
2d68843
comments
wbo4958 Oct 27, 2023
fbd647d
comments
wbo4958 Oct 30, 2023
58899b2
comments
wbo4958 Nov 2, 2023
cd1c0ef
keep Long internal
wbo4958 Nov 2, 2023
51ac5cb
remove duplicated resources
wbo4958 Nov 6, 2023
d02a3be
fix CI
wbo4958 Nov 6, 2023
07c42b3
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 13, 2023
2127a7b
validate the task.amount in the ResourceProfile and TaskResourceProfile
wbo4958 Nov 13, 2023
61bcb34
add tests
wbo4958 Nov 13, 2023
eb7f918
add warnOnWastedResources for TaskResourceProfile
wbo4958 Nov 14, 2023
18084e6
change the rp corresponding tests
wbo4958 Nov 14, 2023
e9e7a26
Comments
wbo4958 Nov 16, 2023
877cacf
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 16, 2023
8657837
comments
wbo4958 Nov 20, 2023
6ca3567
comments
wbo4958 Nov 22, 2023
1e590bc
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 22, 2023
0be3176
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 27, 2023
3b08d1e
toIndexedSeq
wbo4958 Nov 27, 2023
c772b33
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 4, 2023
c2da6e3
comments
wbo4958 Dec 4, 2023
196e11b
comment
wbo4958 Dec 4, 2023
7c11b6c
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 4, 2023
347196f
resolve comments
wbo4958 Dec 20, 2023
ab4c48e
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import scala.collection.mutable

import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement}
import org.apache.spark.resource.ResourceAmountUtils.RESOURCE_TOTAL_AMOUNT
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

Expand All @@ -28,12 +29,20 @@ private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = 1

/**
* Acquire the resources.
* @param amount How many addresses are requesting.
* @return ResourceInformation
*/
def acquire(amount: Int): ResourceInformation = {
val allocated = availableAddrs.take(amount)
acquire(allocated)
new ResourceInformation(resourceName, allocated.toArray)
// Any available address from availableAddrs must be a whole resource
// since worker needs to do full resources to the executors.
val addresses = availableAddrs.take(amount)
assert(addresses.length == amount)

acquire(addresses.map(addr => addr -> RESOURCE_TOTAL_AMOUNT).toMap)
new ResourceInformation(resourceName, addresses.toArray)
}
}

Expand Down Expand Up @@ -162,7 +171,7 @@ private[spark] class WorkerInfo(
*/
def recoverResources(expected: Map[String, ResourceInformation]): Unit = {
expected.foreach { case (rName, rInfo) =>
resources(rName).acquire(rInfo.addresses)
resources(rName).acquire(rInfo.addresses.map(addr => addr -> RESOURCE_TOTAL_AMOUNT).toMap)
}
}

Expand All @@ -172,7 +181,7 @@ private[spark] class WorkerInfo(
*/
private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = {
allocated.foreach { case (rName, rInfo) =>
resources(rName).release(rInfo.addresses)
resources(rName).release(rInfo.addresses.map(addrs => addrs -> RESOURCE_TOTAL_AMOUNT).toMap)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ private[spark] class CoarseGrainedExecutorBackend(
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources.put(taskDesc.taskId, taskDesc.resources)
// Convert resources amounts into ResourceInformation
val resources = taskDesc.resources.map { case (rName, addressesAmounts) =>
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)}
taskResources.put(taskDesc.taskId, resources)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think taskResources is needed at all anymore. Lets remove it unless you see it being used for something I'm missing. It was used in the statusUpdate call below that you removed. I actually think it wasn't needed even before (changed in Spark 3.4) that since the taskDescription and runningTasks has the same information and is now accessible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so taskResources originally was required and provided useful functionality because the extra resources from taskDesc.resources wasn't exposed as public here. Since a previous change, taskResources is only used by the tests. But the tests were only validating it with taskResources because that is how it functionally works. The fact it isn't actually using taskResources means we are testing something that could functionally be wrong.

We should update the tests to stop using taskResources (and remove taskResources) and instead use backend.executor.runningTasks.get(taskId).taskDescription.resources, which is what we are really using to track the extra resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. new commits have removed the taskResources

executor.launchTask(this, taskDesc)
}

Expand Down Expand Up @@ -271,7 +274,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
val resources = executor.runningTasks.get(taskId).taskDescription.resources
val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -610,13 +610,17 @@ private[spark] class Executor(
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
// Convert resources amounts info to ResourceInformation
val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
}
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
cpus = taskDescription.cpus,
resources = taskDescription.resources,
resources = resources,
plugins = plugins)
threwException = false
res
Expand Down
111 changes: 73 additions & 38 deletions core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,42 @@ package org.apache.spark.resource
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.resource.ResourceAmountUtils.RESOURCE_TOTAL_AMOUNT

private[spark] object ResourceAmountUtils {
/**
* Using "double" to do the resource calculation may encounter a problem of precision loss. Eg
*
* scala> val taskAmount = 1.0 / 9
* taskAmount: Double = 0.1111111111111111
*
* scala> var total = 1.0
* total: Double = 1.0
*
* scala> for (i <- 1 to 9 ) {
* | if (total >= taskAmount) {
* | total -= taskAmount
* | println(s"assign $taskAmount for task $i, total left: $total")
* | } else {
* | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total")
* | }
* | }
* assign 0.1111111111111111 for task 1, total left: 0.8888888888888888
* assign 0.1111111111111111 for task 2, total left: 0.7777777777777777
* assign 0.1111111111111111 for task 3, total left: 0.6666666666666665
* assign 0.1111111111111111 for task 4, total left: 0.5555555555555554
* assign 0.1111111111111111 for task 5, total left: 0.44444444444444425
* assign 0.1111111111111111 for task 6, total left: 0.33333333333333315
* assign 0.1111111111111111 for task 7, total left: 0.22222222222222204
* assign 0.1111111111111111 for task 8, total left: 0.11111111111111094
* ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094
*
* So we multiply RESOURCE_TOTAL_AMOUNT to convert the double to long to avoid this limitation.
* Double can display up to 16 decimal places, so we set the factor to
* 10, 000, 000, 000, 000, 000L.
*/
final val RESOURCE_TOTAL_AMOUNT: Long = 10000000000000000L
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rename this ONE_ENTIRE_RESOURCE or something that indicates this is the entire amount of a single resource..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good suggestion. Done

}

/**
* Trait used to help executor/worker allocate resources.
Expand All @@ -29,59 +65,53 @@ private[spark] trait ResourceAllocator {

protected def resourceName: String
protected def resourceAddresses: Seq[String]
protected def slotsPerAddress: Int

/**
* Map from an address to its availability, a value > 0 means the address is available,
* while value of 0 means the address is fully assigned.
*
* For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value
* can be a multiple, such that each address can be allocated up to [[slotsPerAddress]]
* times.
* Map from an address to its availability default to 1.0 (we multiply RESOURCE_TOTAL_AMOUNT
* to avoid precision error), a value > 0 means the address is available, while value of
* 0 means the address is fully assigned.
*/
private lazy val addressAvailabilityMap = {
mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*)
mutable.HashMap(resourceAddresses.map(address => address -> RESOURCE_TOTAL_AMOUNT): _*)
}

/**
* Sequence of currently available resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like
* Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one.
* Get the amounts of resources that have been multiplied by RESOURCE_TOTAL_AMOUNT.
* @return the resources amounts
*/
def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap

/**
* Sequence of currently available resource addresses which are not fully assigned.
*/
def availableAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 > 0).keys.toSeq.sorted

/**
* Sequence of currently assigned resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like
* Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice.
*/
private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until slotsPerAddress - available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 < RESOURCE_TOTAL_AMOUNT).keys.toSeq.sorted

/**
* Acquire a sequence of resource addresses (to a launched task), these addresses must be
* available. When the task finishes, it will return the acquired resource addresses.
* Throw an Exception if an address is not available or doesn't exist.
*/
def acquire(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def acquire(addressesAmounts: Map[String, Long]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable > 0) {
addressAvailabilityMap(address) -= 1
s"address $address doesn't exist."))

val left = addressAvailabilityMap(address) - amount

if (left < 0) {
throw new SparkException(s"Try to acquire $resourceName address $address " +
s"amount: ${amount.toDouble / RESOURCE_TOTAL_AMOUNT}, but only " +
s"${prevAmount.toDouble / RESOURCE_TOTAL_AMOUNT} left.")
} else {
throw new SparkException("Try to acquire an address that is not available. " +
s"$resourceName address $address is not available.")
addressAvailabilityMap(address) = left
}
}
}
Expand All @@ -91,16 +121,21 @@ private[spark] trait ResourceAllocator {
* addresses are released when a task has finished.
* Throw an Exception if an address is not assigned or doesn't exist.
*/
def release(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def release (addressesAmounts: Map[String, Long]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable < slotsPerAddress) {
addressAvailabilityMap(address) += 1

val total = prevAmount + amount

if (total > RESOURCE_TOTAL_AMOUNT) {
throw new SparkException(s"Try to release $resourceName address $address " +
s"amount: ${amount.toDouble / RESOURCE_TOTAL_AMOUNT}. But the total amount: " +
s"${total.toDouble / RESOURCE_TOTAL_AMOUNT} " +
s"after release should be <= 1")
} else {
throw new SparkException(s"Try to release an address that is not assigned. $resourceName " +
s"address $address is not assigned.")
addressAvailabilityMap(address) = total
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,16 @@ private[spark] object ResourceUtils extends Logging {
// integer amount and the number of slots per address. For instance, if the amount is 0.5,
// the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per
// address, which allows you to put 2 tasks on that address. Note if amount is greater
// than 1, then the number of slots per address has to be 1. This would indicate that a
// than 1, then the number of parts per address has to be 1. This would indicate that a
// would have multiple addresses assigned per task. This can be used for calculating
// the number of tasks per executor -> (executorAmount * numParts) / (integer amount).
// Returns tuple of (integer amount, numParts)
def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = {
val parts = if (doubleAmount <= 0.5) {
val parts = if (doubleAmount <= 1.0) {
Math.floor(1.0 / doubleAmount).toInt
} else if (doubleAmount % 1 != 0) {
throw new SparkException(
s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.")
s"The resource amount ${doubleAmount} must be either <= 1.0, or a whole number.")
} else {
1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since}
*
* @param resourceName Resource name
* @param amount Amount requesting as a Double to support fractional resource requests.
* Valid values are less than or equal to 0.5 or whole numbers. This essentially
* Valid values are less than or equal to 1.0 or whole numbers. This essentially
* lets you configure X number of tasks to run on a single resource,
* ie amount equals 0.5 translates into 2 tasks per resource address.
*/
Expand All @@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since}
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

assert(amount <= 0.5 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
assert(amount <= 1.0 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 1.0, or a whole number.")

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,27 @@ import org.apache.spark.resource.{ResourceAllocator, ResourceInformation}
* information.
* @param name Resource name
* @param addresses Resource addresses provided by the executor
* @param numParts Number of ways each resource is subdivided when scheduling tasks
*/
private[spark] class ExecutorResourceInfo(
name: String,
addresses: Seq[String],
numParts: Int)
addresses: Seq[String])
extends ResourceInformation(name, addresses.toArray) with ResourceAllocator {

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = numParts
def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress

/**
* Calculate how many parts the executor can offer according to the task resource amount
* @param taskAmount how many resource amount the task required
* @return the total parts
*/
def totalParts(taskAmount: Double): Int = {
assert(taskAmount > 0.0)
if (taskAmount >= 1.0) {
addresses.length / taskAmount.ceil.toInt
} else {
addresses.length * Math.floor(1.0 / taskAmount).toInt
}
}

}
Loading