Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ee71e46
Use fraction to do the resource calculation
wbo4958 Oct 15, 2023
2d68843
comments
wbo4958 Oct 27, 2023
fbd647d
comments
wbo4958 Oct 30, 2023
58899b2
comments
wbo4958 Nov 2, 2023
cd1c0ef
keep Long internal
wbo4958 Nov 2, 2023
51ac5cb
remove duplicated resources
wbo4958 Nov 6, 2023
d02a3be
fix CI
wbo4958 Nov 6, 2023
07c42b3
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 13, 2023
2127a7b
validate the task.amount in the ResourceProfile and TaskResourceProfile
wbo4958 Nov 13, 2023
61bcb34
add tests
wbo4958 Nov 13, 2023
eb7f918
add warnOnWastedResources for TaskResourceProfile
wbo4958 Nov 14, 2023
18084e6
change the rp corresponding tests
wbo4958 Nov 14, 2023
e9e7a26
Comments
wbo4958 Nov 16, 2023
877cacf
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 16, 2023
8657837
comments
wbo4958 Nov 20, 2023
6ca3567
comments
wbo4958 Nov 22, 2023
1e590bc
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 22, 2023
0be3176
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Nov 27, 2023
3b08d1e
toIndexedSeq
wbo4958 Nov 27, 2023
c772b33
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 4, 2023
c2da6e3
comments
wbo4958 Dec 4, 2023
196e11b
comment
wbo4958 Dec 4, 2023
7c11b6c
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 4, 2023
347196f
resolve comments
wbo4958 Dec 20, 2023
ab4c48e
Merge remote-tracking branch 'upstream/master' into SPARK-45527
wbo4958 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,29 @@ package org.apache.spark.deploy.master
import scala.collection.mutable

import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, ResourceRequirement}
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils

private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String])
extends ResourceAllocator {

override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
override protected def slotsPerAddress: Int = 1

/**
* Acquire the resources.
* @param amount How many addresses are requesting.
* @return ResourceInformation
*/
def acquire(amount: Int): ResourceInformation = {
val allocated = availableAddrs.take(amount)
acquire(allocated)
new ResourceInformation(resourceName, allocated.toArray)
// Any available address from availableAddrs must be a whole resource
// since worker needs to do full resources to the executors.
val addresses = availableAddrs.take(amount)
assert(addresses.length == amount)

acquire(addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap)
new ResourceInformation(resourceName, addresses.toArray)
}
}

Expand Down Expand Up @@ -163,7 +171,7 @@ private[spark] class WorkerInfo(
*/
def recoverResources(expected: Map[String, ResourceInformation]): Unit = {
expected.foreach { case (rName, rInfo) =>
resources(rName).acquire(rInfo.addresses.toImmutableArraySeq)
resources(rName).acquire(rInfo.addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap)
}
}

Expand All @@ -173,7 +181,7 @@ private[spark] class WorkerInfo(
*/
private def releaseResources(allocated: Map[String, ResourceInformation]): Unit = {
allocated.foreach { case (rName, rInfo) =>
resources(rName).release(rInfo.addresses.toImmutableArraySeq)
resources(rName).release(rInfo.addresses.map(addrs => addrs -> ONE_ENTIRE_RESOURCE).toMap)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.{Failure, Success}
Expand Down Expand Up @@ -65,16 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(

private var _resources = Map.empty[String, ResourceInformation]

/**
* Map each taskId to the information about the resource allocated to it, Please refer to
* [[ResourceInformation]] for specifics.
* CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
* Exposed for testing only.
*/
private[executor] val taskResources = new ConcurrentHashMap[
Long, Map[String, ResourceInformation]
]

private var decommissioned = false

// Track the last time in ns that at least one task is running. If no task is running and all
Expand Down Expand Up @@ -192,7 +181,6 @@ private[spark] class CoarseGrainedExecutorBackend(
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources.put(taskDesc.taskId, taskDesc.resources)
executor.launchTask(this, taskDesc)
}

Expand Down Expand Up @@ -272,11 +260,10 @@ private[spark] class CoarseGrainedExecutorBackend(
}

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
val resources = executor.runningTasks.get(taskId).taskDescription.resources
val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
lastTaskFinishTime.set(System.nanoTime())
}
driver match {
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -618,13 +618,17 @@ private[spark] class Executor(
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
// Convert resources amounts info to ResourceInformation
val resources = taskDescription.resources.map { case (rName, addressesAmounts) =>
rName -> new ResourceInformation(rName, addressesAmounts.keys.toSeq.sorted.toArray)
}
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem,
cpus = taskDescription.cpus,
resources = taskDescription.resources,
resources = resources,
plugins = plugins)
threwException = false
res
Expand Down
118 changes: 80 additions & 38 deletions core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,49 @@ package org.apache.spark.resource
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE

private[spark] object ResourceAmountUtils {
/**
* Using "double" to do the resource calculation may encounter a problem of precision loss. Eg
*
* scala> val taskAmount = 1.0 / 9
* taskAmount: Double = 0.1111111111111111
*
* scala> var total = 1.0
* total: Double = 1.0
*
* scala> for (i <- 1 to 9 ) {
* | if (total >= taskAmount) {
* | total -= taskAmount
* | println(s"assign $taskAmount for task $i, total left: $total")
* | } else {
* | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total")
* | }
* | }
* assign 0.1111111111111111 for task 1, total left: 0.8888888888888888
* assign 0.1111111111111111 for task 2, total left: 0.7777777777777777
* assign 0.1111111111111111 for task 3, total left: 0.6666666666666665
* assign 0.1111111111111111 for task 4, total left: 0.5555555555555554
* assign 0.1111111111111111 for task 5, total left: 0.44444444444444425
* assign 0.1111111111111111 for task 6, total left: 0.33333333333333315
* assign 0.1111111111111111 for task 7, total left: 0.22222222222222204
* assign 0.1111111111111111 for task 8, total left: 0.11111111111111094
* ERROR Can't assign 0.1111111111111111 for task 9, total left: 0.11111111111111094
*
* So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation.
* Double can display up to 16 decimal places, so we set the factor to
* 10, 000, 000, 000, 000, 000L.
*/
final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L

def isOneEntireResource(amount: Long): Boolean = amount == ONE_ENTIRE_RESOURCE

def toInternalResource(amount: Double): Long = (amount * ONE_ENTIRE_RESOURCE).toLong

def toFractionalResource(amount: Long): Double = amount.toDouble / ONE_ENTIRE_RESOURCE

}

/**
* Trait used to help executor/worker allocate resources.
Expand All @@ -29,59 +72,53 @@ private[spark] trait ResourceAllocator {

protected def resourceName: String
protected def resourceAddresses: Seq[String]
protected def slotsPerAddress: Int

/**
* Map from an address to its availability, a value > 0 means the address is available,
* while value of 0 means the address is fully assigned.
*
* For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), this value
* can be a multiple, such that each address can be allocated up to [[slotsPerAddress]]
* times.
* Map from an address to its availability default to 1.0 (we multiply ONE_ENTIRE_RESOURCE
* to avoid precision error), a value > 0 means the address is available, while value of
* 0 means the address is fully assigned.
*/
private lazy val addressAvailabilityMap = {
mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*)
mutable.HashMap(resourceAddresses.map(address => address -> ONE_ENTIRE_RESOURCE): _*)
}

/**
* Sequence of currently available resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 can look like
* Seq("0", "0", "1"), where address 0 has two assignments available, and 1 has one.
* Get the amounts of resources that have been multiplied by ONE_ENTIRE_RESOURCE.
* @return the resources amounts
*/
def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap

/**
* Sequence of currently available resource addresses which are not fully assigned.
*/
def availableAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 > 0).keys.toSeq.sorted

/**
* Sequence of currently assigned resource addresses.
*
* With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain duplicate addresses
* e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 can look like
* Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned twice.
*/
private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
.flatMap { case (addr, available) =>
(0 until slotsPerAddress - available).map(_ => addr)
}.toSeq.sorted
.filter(addresses => addresses._2 < ONE_ENTIRE_RESOURCE).keys.toSeq.sorted

/**
* Acquire a sequence of resource addresses (to a launched task), these addresses must be
* available. When the task finishes, it will return the acquired resource addresses.
* Throw an Exception if an address is not available or doesn't exist.
*/
def acquire(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def acquire(addressesAmounts: Map[String, Long]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable > 0) {
addressAvailabilityMap(address) -= 1
s"address $address doesn't exist."))

val left = prevAmount - amount

if (left < 0) {
throw new SparkException(s"Try to acquire $resourceName address $address " +
s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}, but only " +
s"${ResourceAmountUtils.toFractionalResource(prevAmount)} left.")
} else {
throw new SparkException("Try to acquire an address that is not available. " +
s"$resourceName address $address is not available.")
addressAvailabilityMap(address) = left
}
}
}
Expand All @@ -91,16 +128,21 @@ private[spark] trait ResourceAllocator {
* addresses are released when a task has finished.
* Throw an Exception if an address is not assigned or doesn't exist.
*/
def release(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressAvailabilityMap.getOrElse(address,
def release(addressesAmounts: Map[String, Long]): Unit = {
addressesAmounts.foreach { case (address, amount) =>
val prevAmount = addressAvailabilityMap.getOrElse(address,
throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " +
s"address $address doesn't exist."))
if (isAvailable < slotsPerAddress) {
addressAvailabilityMap(address) += 1

val total = prevAmount + amount

if (total > ONE_ENTIRE_RESOURCE) {
throw new SparkException(s"Try to release $resourceName address $address " +
s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}. But the total amount: " +
s"${ResourceAmountUtils.toFractionalResource(total)} " +
s"after release should be <= 1")
} else {
throw new SparkException(s"Try to release an address that is not assigned. $resourceName " +
s"address $address is not assigned.")
addressAvailabilityMap(address) = total
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class ResourceProfile(
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest]) extends Serializable with Logging {

validate()

// _id is only a var for testing purposes
private var _id = ResourceProfile.getNextProfileId
// This is used for any resources that use fractional amounts, the key is the resource name
Expand All @@ -59,6 +61,19 @@ class ResourceProfile(
private var _maxTasksPerExecutor: Option[Int] = None
private var _coresLimitKnown: Boolean = false

/**
* Validate the ResourceProfile
*/
protected def validate(): Unit = {
// The task.amount in ResourceProfile falls within the range of 0 to 0.5,
// or it's a whole number
for ((_, taskReq) <- taskResources) {
val taskAmount = taskReq.amount
assert(taskAmount <= 0.5 || taskAmount % 1 == 0,
s"The task resource amount ${taskAmount} must be either <= 0.5, or a whole number.")
}
}

/**
* A unique id of this ResourceProfile
*/
Expand Down Expand Up @@ -105,12 +120,8 @@ class ResourceProfile(

/*
* This function takes into account fractional amounts for the task resource requirement.
* Spark only supports fractional amounts < 1 to basically allow for multiple tasks
* to use the same resource address.
* The way the scheduler handles this is it adds the same address the number of slots per
* address times and then the amount becomes 1. This way it re-uses the same address
* the correct number of times. ie task requirement amount=0.25 -> addrs["0", "0", "0", "0"]
* and scheduler task amount=1. See ResourceAllocator.slotsPerAddress.
* Spark only supports fractional amounts &lt; 1 to basically allow for multiple tasks
* to use the same resource address or a whole number to use the multiple whole addresses.
*/
private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = {
val taskAmount = taskResources.getOrElse(resource,
Expand Down Expand Up @@ -280,6 +291,11 @@ private[spark] class TaskResourceProfile(
override val taskResources: Map[String, TaskResourceRequest])
extends ResourceProfile(Map.empty, taskResources) {

// The task.amount in TaskResourceProfile falls within the range of 0 to 1.0,
// or it's a whole number, and it has been checked in the TaskResourceRequest.
// Therefore, we can safely skip this check.
override protected def validate(): Unit = {}

override protected[spark] def getCustomExecutorResources()
: Map[String, ExecutorResourceRequest] = {
if (SparkEnv.get == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,17 @@ private[spark] object ResourceUtils extends Logging {

// Used to take a fraction amount from a task resource requirement and split into a real
// integer amount and the number of slots per address. For instance, if the amount is 0.5,
// the we get (1, 2) back out. This indicates that for each 1 address, it has 2 slots per
// address, which allows you to put 2 tasks on that address. Note if amount is greater
// than 1, then the number of slots per address has to be 1. This would indicate that a
// would have multiple addresses assigned per task. This can be used for calculating
// the number of tasks per executor -> (executorAmount * numParts) / (integer amount).
// the we get (1, 2) back out. This indicates that for each 1 address, it allows you to
// put 2 tasks on that address. Note if amount is greater than 1, then the number of
// running tasks per address has to be 1. This can be used for calculating
// the number of tasks per executor = (executorAmount * numParts) / (integer amount).
// Returns tuple of (integer amount, numParts)
def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = {
val parts = if (doubleAmount <= 0.5) {
val parts = if (doubleAmount <= 1.0) {
Math.floor(1.0 / doubleAmount).toInt
} else if (doubleAmount % 1 != 0) {
throw new SparkException(
s"The resource amount ${doubleAmount} must be either <= 0.5, or a whole number.")
s"The resource amount ${doubleAmount} must be either <= 1.0, or a whole number.")
} else {
1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since}
*
* @param resourceName Resource name
* @param amount Amount requesting as a Double to support fractional resource requests.
* Valid values are less than or equal to 0.5 or whole numbers. This essentially
* Valid values are less than or equal to 1.0 or whole numbers. This essentially
* lets you configure X number of tasks to run on a single resource,
* ie amount equals 0.5 translates into 2 tasks per resource address.
*/
Expand All @@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since}
class TaskResourceRequest(val resourceName: String, val amount: Double)
extends Serializable {

assert(amount <= 0.5 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
assert(amount <= 1.0 || amount % 1 == 0,
s"The resource amount ${amount} must be either <= 1.0, or a whole number.")

override def equals(obj: Any): Boolean = {
obj match {
Expand Down
Loading