-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45527][CORE] Use fraction to do the resource calculation #44690
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This (PR) introduces the utilization of fractions instead of slots, which is similar to the CPU strategy,
for determining whether a worker offer can provide the necessary resources to tasks.
For instance, when an executor reports to the driver with [gpu, ["1,", "2"]], the driver constructs an executor data map.
The keys in the map represent the GPU addresses, and their default values are set to 1.0, indicating one whole GPU.
Consequently, the available resource amounts for the executor are as follows: { "1" -> 1.0f, "2" -> 1.0f }.
When offering resources to a task that requires 1 CPU and 0.08 GPU, the worker offer examines the available resource amounts.
It identifies that the capacity of GPU address "1.0" is greater than the task's GPU requirement (1.0 >= 0.08).
Therefore, Spark assigns the GPU address "1" to this task. After the assignment, the available resource amounts
for this executor are updated to { "1" -> 0.92, "2" -> 1.0}, ensuring that the remaining resources can be allocated to other tasks.
In scenarios where other tasks, using different task resource profiles, request varying GPU amounts
when dynamic allocation is disabled, Spark applies the same comparison approach. It compares the task's GPU requirement with
the available resource amounts to determine if the resources can be assigned to the task.
The existing resources offering including gpu, fpga is based on "slots per address", which is defined by the default resource profile.
and it's a fixed number for all different resource profiles when dynamic allcation is disabled.
Consider the below test case,
``` scala
withTempDir { dir =>
val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript",
"""{"name": "gpu","addresses":["0"]}""")
val conf = new SparkConf()
.setAppName("test")
.setMaster("local-cluster[1, 12, 1024]")
.set("spark.executor.cores", "12")
conf.set("spark.worker.resource.gpu.amount", "1")
conf.set("spark.worker.resource.gpu.discoveryScript", scriptPath)
conf.set("spark.executor.resource.gpu.amount", "1")
conf.set("spark.task.resource.gpu.amount", "0.08")
sc = new SparkContext(conf)
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.withResources(rp)
assert(rdd1.collect().size === 100)
}
```
During the initial stages, Spark generates a default resource profile based on the configurations. The calculation
for determining the slots per GPU address is performed as "spark.executor.resource.gpu.amount / spark.task.resource.gpu.amount",
resulting in a value of 12 (1/0.08 = 12). This means that Spark can accommodate up to 12 tasks running on each GPU address simultaneously.
The job is then divided into two stages. The first stage, which consists of 4 tasks, runs concurrently based on
the default resource profile. However, the second stage, comprising 3 tasks, runs sequentially using a new task
resource profile. This new profile specifies that each task requires 1 CPU and 1.0 full GPU.
In reality, the tasks in the second stage are running in parallel, which is the underlying issue.
The problem lies in the line `new TaskResourceRequests().cpus(1).resource("gpu", 1.0)`. The value of 1.0
for the GPU, or any value below 1.0 (specifically, (0, 0.5] which is rounded up to 1.0, spark throws an exception if the value is in (0.5, 1)),
is merely requesting the number of slots. In this case, it is requesting only 1 slot. Consequently, each task
necessitates 1 CPU core and 1 GPU slot, resulting in all tasks running simultaneously.
No
To ensure all tests got passed
No
Closes apache#43494 from wbo4958/SPARK-45527.
Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
|
I will work on the manual tests. |
Manual test on Spark Yarn ClusterEnvironmentThe internal Spark Yarn environment which supports GPU resources. With dynamic allocation off1 GPU
create a gpu_discovery_1_gpu.sh cat <<EOF
{"name": "gpu","addresses":["0"]}
EOF
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=8 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.125 \
--conf spark.executor.resource.gpu.discoveryScript=./gpu_discovery_1_gpu.sh \
--files `pwd`/gpu_discovery_1_gpu.sh \
--conf spark.dynamicAllocation.enabled=falseThe aforementioned spark-submit configurations will launch a single executor with 8 CPU cores and 1 GPU. The tasks requires 1 CPU core and 0.125 GPUs each, allowing for the concurrent execution of 8 tasks.
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 12).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}
}
rdd2.collect()The provided Spark job will be split into two stages. The first stage comprises 12 tasks, each requiring 1 CPU core and 0.125 GPUs. As a result, the first 8 tasks can run concurrently, and then run the remaining 4 tasks. In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU core and 0.6 GPUs. Consequently, only one task will run at any given time, while the remaining 2 tasks will execute sequentially. 2 GPUs
create a gpu_discovery_2_gpus.sh cat <<EOF
{"name": "gpu","addresses":["0", "1"]}
EOF
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=8 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=2 \
--conf spark.task.resource.gpu.amount=0.25 \
--conf spark.executor.resource.gpu.discoveryScript=./gpu_discovery_2_gpus.sh \
--files `pwd`/gpu_discovery_2_gpus.sh \
--conf spark.dynamicAllocation.enabled=falseThe aforementioned spark-submit configurations will launch a single executor with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each, allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when offering the resources.
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid > 0) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
rdd2.collect()The provided Spark job will be split into two stages. The first stage comprises 8 tasks, each requiring 1 CPU core and 0.25 GPUs. As a result, the total 8 tasks can run concurrently. The first 4 tasks will grab GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when offering the resources. The assert line can ensure this policy. In contrast, the second stage consists of 2 tasks, each necessitating 1 CPU core and 0.6 GPUs, since there're 2 GPUs availabe, so the total 2 tasks can run concurrently, each grabs 1 different GPU, the assert line can ensure that concurrent spark jobsThis test case is to ensure the other spark job can still grab the left gpu resources and run alongside the other spark job.
create a gpu_discovery_2_gpus.sh cat <<EOF
{"name": "gpu","addresses":["0", "1"]}
EOF
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=8 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=2 \
--conf spark.task.resource.gpu.amount=0.25 \
--conf spark.executor.resource.gpu.discoveryScript=./gpu_discovery_2_gpus.sh \
--files `pwd`/gpu_discovery_2_gpus.sh \
--conf spark.dynamicAllocation.enabled=falseThe aforementioned spark-submit configurations will launch a single executor with 8 CPU cores and 2 GPU. The tasks requires 1 CPU core and 0.25 GPUs each, allowing for the concurrent execution of 8 tasks. the first 4 tasks will grab GPU 0, while the remaining 4 tasks grabs the GPU 1 due to the round-robin manner when offering the resources.
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
// Submit Spark Job 0 in thread1.
val thread1 = new Thread(() => {
val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.6)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array(tid.toString))
println("sleeping 20s")
Thread.sleep(20000)
iter
}
}
rdd2.collect()
})
thread1.start()
// sleep 5s in main thread to make sure the spark result tasks launched in thread1 are running
Thread.sleep(5000)
// Submit Spark Job 1 in main thread.
// Each spark result task in thread1 takes 0.6 gpus, so there is only 0.4 gpus (for each gpu) left.
// since the default task gpu amount = 0.25, the concurrent spark tasks in Spark Job 1
// will be 1(0.4/0.25) * 2 (2 gpus)
val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
Thread.sleep(10000)
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid % 2 == 1) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
})
rdd.collect()
thread1.join()The given Spark application consists of two spark jobs. The first spark job 0 is submitted in thread1, while the second spark job 1 is submitted in the main thread. To guarantee that the spark job 0 runs prior to the spark job 1, a Event timeline spark job 1 If we change import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
// Submit Spark Job 0 in thread1.
val thread1 = new Thread(() => {
val rdd = sc.range(0, 8, 1, 8).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
val rdd1 = rdd.repartition(2)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
val rp = new ResourceProfileBuilder().require(treqs).build
val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
val tc = TaskContext.get()
val tid = tc.partitionId()
assert(tc.resources()("gpu").addresses sameElements Array(tid.toString))
println("sleeping 20s")
Thread.sleep(20000)
iter
}
}
rdd2.collect()
})
thread1.start()
// sleep 5s in main thread to make sure the spark result tasks launched in thread1 are running
Thread.sleep(5000)
// Submit Spark Job 1 in main thread.
// Each spark result task in thread1 takes 1 gpus, so there is no available gpus left for spark job 1.
// The spark job 1 will run after spark job 0 finished, but we can't ensure which gpu the task will grab.
val rdd = sc.range(0, 4, 1, 2).mapPartitions(iter => {
Thread.sleep(10000)
val tc = TaskContext.get()
assert(tc.resources().contains("gpu"))
iter
})
rdd.collect()
thread1.join()From the picture, we can see, spark job 1 was submitted when spark job 0 was running, but the tasks on spark job 1 didn't run because of a lack of GPU resources. After spark job 0 is finished and releases the GPU, then tasks on spark job 1 can grab the GPUs and run. |
With dynamic allocation off1 GPU
create a gpu_discovery_1_gpu.sh cat <<EOF
{"name": "gpu","addresses":["0"]}
EOF
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=4 \
--conf spark.task.cpus=1 \
--files `pwd`/gpu_discovery_1_gpu.sh \
--conf spark.dynamicAllocation.maxExecutors=1 \
--conf spark.dynamicAllocation.enabled=trueBy utilizing the aforementioned spark-submit configurations, dynamic allocation is enabled, resulting in the launch of an initial executor equipped with 4 CPU cores. As each task requires 1 CPU core, this setup allows for the simultaneous execution of 4 tasks.
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 6).mapPartitions { iter => {
val tc = TaskContext.get()
assert(!tc.resources().contains("gpu"))
iter
}
}
val ereqs = new ExecutorResourceRequests().cores(8).resource("gpu", 1, "./gpu_discovery_1_gpu.sh")
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.125)
val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build
val rdd1 = rdd.repartition(12).withResources(rp).mapPartitions { iter => {
Thread.sleep(1000)
val tc = TaskContext.get()
assert(tc.resources()("gpu").addresses sameElements Array("0"))
iter
}
}
rdd1.collect()The provided Spark job will be split into two stages. The first stage comprises 6 tasks, each requiring 1 CPU core for the default profile file. As a result, the first 4 tasks can run concurrently, and then run the remaining 2 tasks. Within the second stage, there are a total of 12 tasks that demand a distinct resource profile. These tasks necessitate executors equipped with 8 cores and 1 GPU, with each individual task requiring 1 CPU core and 0.125 GPUs. As a result, the initial 8 tasks will execute concurrently, followed by the subsequent 4 tasks. 2 GPUs
create a gpu_discovery_2_gpus.sh cat <<EOF
{"name": "gpu","addresses":["0", "1"]}
EOF
spark-shell --master yarn \
--num-executors=1 \
--conf spark.executor.cores=4 \
--conf spark.task.cpus=1 \
--files `pwd`/gpu_discovery_2_gpus.sh \
--conf spark.dynamicAllocation.maxExecutors=1 \
--conf spark.dynamicAllocation.enabled=trueBy utilizing the aforementioned spark-submit configurations, dynamic allocation is enabled, resulting in the launch of an initial executor equipped with 4 CPU cores. As each task requires 1 CPU core, this setup allows for the simultaneous execution of 4 tasks.
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 6).mapPartitions { iter => {
val tc = TaskContext.get()
assert(!tc.resources().contains("gpu"))
iter
}
}
val ereqs = new ExecutorResourceRequests().cores(8).resource("gpu", 2, "./gpu_discovery_2_gpus.sh")
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.25)
val rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build
val rdd1 = rdd.repartition(8).withResources(rp).mapPartitions { iter => {
Thread.sleep(1000)
val tc = TaskContext.get()
val tid = tc.partitionId()
if (tid >= 4) {
assert(tc.resources()("gpu").addresses sameElements Array("1"))
} else {
assert(tc.resources()("gpu").addresses sameElements Array("0"))
}
iter
}
}
rdd1.collect()The provided Spark job will be split into two stages. The first stage comprises 6 tasks, each requiring 1 CPU core for the default profile file. As a result, the first 4 tasks can run concurrently, and the run the remaining 2 tasks. Within the second stage, there are a total of 8 tasks that demand a distinct resource profile. These tasks necessitate executors equipped with 8 cores and 2 GPU, with each individual task requiring 1 CPU core and 0.25 GPUs. As a result, the total 8 tasks will execute concurrently, the first 4 tasks will grab GPU ID 0 while the remaining 4 tasks will grab GPU ID 1. The assert line can ensure that. |
|
Hi @tgravescs, Could you help to review it? Thx very much. |
### What changes were proposed in this pull request? When cherry-picking #43494 back to branch 3.5 #44690, I ran into the issue that some tests for Scala 2.12 failed when comparing two maps. It turned out that the function [compareMaps](https://github.com/apache/spark/pull/43494/files#diff-f205431247dd9446f4ce941e5a4620af438c242b9bdff6e7faa7df0194db49acR129) is not so robust for scala 2.12 and scala 2.13. - scala 2.13 ``` scala Welcome to Scala 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.9). Type in expressions for evaluation. Or try :help. scala> def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], | eps: Double = 0.00000001): Boolean = { | lhs.size == rhs.size && | lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => | lName == rName && (lAmount - rAmount).abs < eps | } | } | | import scala.collection.mutable.HashMap | val resources = Map("gpu" -> Map("a" -> 1.0, "b" -> 2.0, "c" -> 3.0, "d"-> 4.0)) | val mapped = resources.map { case (rName, addressAmounts) => | rName -> HashMap(addressAmounts.toSeq.sorted: _*) | } | | compareMaps(resources("gpu"), mapped("gpu").toMap) def compareMaps(lhs: Map[String,Double], rhs: Map[String,Double], eps: Double): Boolean import scala.collection.mutable.HashMap val resources: scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Double]] = Map(gpu -> Map(a -> 1.0, b -> 2.0, c -> 3.0, d -> 4.0)) val mapped: scala.collection.immutable.Map[String,scala.collection.mutable.HashMap[String,Double]] = Map(gpu -> HashMap(a -> 1.0, b -> 2.0, c -> 3.0, d -> 4.0)) val res0: Boolean = true ``` - scala 2.12 ``` scala Welcome to Scala 2.12.14 (OpenJDK 64-Bit Server VM, Java 17.0.9). Type in expressions for evaluation. Or try :help. scala> def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], | eps: Double = 0.00000001): Boolean = { | lhs.size == rhs.size && | lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => | lName == rName && (lAmount - rAmount).abs < eps | } | } compareMaps: (lhs: Map[String,Double], rhs: Map[String,Double], eps: Double)Boolean scala> import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap scala> val resources = Map("gpu" -> Map("a" -> 1.0, "b" -> 2.0, "c" -> 3.0, "d"-> 4.0)) resources: scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Double]] = Map(gpu -> Map(a -> 1.0, b -> 2.0, c -> 3.0, d -> 4.0)) scala> val mapped = resources.map { case (rName, addressAmounts) => | rName -> HashMap(addressAmounts.toSeq.sorted: _*) | } mapped: scala.collection.immutable.Map[String,scala.collection.mutable.HashMap[String,Double]] = Map(gpu -> Map(b -> 2.0, d -> 4.0, a -> 1.0, c -> 3.0)) scala> compareMaps(resources("gpu"), mapped("gpu").toMap) res0: Boolean = false ``` The same code bug got different results for Scala 2.12 and Scala 2.13. This PR tried to rework compareMaps to make tests pass for both scala 2.12 and scala 2.13 ### Why are the changes needed? Some users may back-port #43494 to some older branch for scala 2.12 and will run into the same issue. It's just trivial work to make the GPU fraction tests compatible with Scala 2.12 and Scala 2.13 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Make sure all the CI pipelines pass ### Was this patch authored or co-authored using generative AI tooling? No Closes #44735 from wbo4958/gpu-fraction-tests. Authored-by: Bobby Wang <[email protected]> Signed-off-by: Yi Wu <[email protected]>
When cherry-picking apache#43494 back to branch 3.5 apache#44690, I ran into the issue that some tests for Scala 2.12 failed when comparing two maps. It turned out that the function [compareMaps](https://github.com/apache/spark/pull/43494/files#diff-f205431247dd9446f4ce941e5a4620af438c242b9bdff6e7faa7df0194db49acR129) is not so robust for scala 2.12 and scala 2.13. - scala 2.13 ``` scala Welcome to Scala 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.9). Type in expressions for evaluation. Or try :help. scala> def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], | eps: Double = 0.00000001): Boolean = { | lhs.size == rhs.size && | lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => | lName == rName && (lAmount - rAmount).abs < eps | } | } | | import scala.collection.mutable.HashMap | val resources = Map("gpu" -> Map("a" -> 1.0, "b" -> 2.0, "c" -> 3.0, "d"-> 4.0)) | val mapped = resources.map { case (rName, addressAmounts) => | rName -> HashMap(addressAmounts.toSeq.sorted: _*) | } | | compareMaps(resources("gpu"), mapped("gpu").toMap) def compareMaps(lhs: Map[String,Double], rhs: Map[String,Double], eps: Double): Boolean import scala.collection.mutable.HashMap val resources: scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Double]] = Map(gpu -> Map(a -> 1.0, b -> 2.0, c -> 3.0, d -> 4.0)) val mapped: scala.collection.immutable.Map[String,scala.collection.mutable.HashMap[String,Double]] = Map(gpu -> HashMap(a -> 1.0, b -> 2.0, c -> 3.0, d -> 4.0)) val res0: Boolean = true ``` - scala 2.12 ``` scala Welcome to Scala 2.12.14 (OpenJDK 64-Bit Server VM, Java 17.0.9). Type in expressions for evaluation. Or try :help. scala> def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double], | eps: Double = 0.00000001): Boolean = { | lhs.size == rhs.size && | lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) => | lName == rName && (lAmount - rAmount).abs < eps | } | } compareMaps: (lhs: Map[String,Double], rhs: Map[String,Double], eps: Double)Boolean scala> import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap scala> val resources = Map("gpu" -> Map("a" -> 1.0, "b" -> 2.0, "c" -> 3.0, "d"-> 4.0)) resources: scala.collection.immutable.Map[String,scala.collection.immutable.Map[String,Double]] = Map(gpu -> Map(a -> 1.0, b -> 2.0, c -> 3.0, d -> 4.0)) scala> val mapped = resources.map { case (rName, addressAmounts) => | rName -> HashMap(addressAmounts.toSeq.sorted: _*) | } mapped: scala.collection.immutable.Map[String,scala.collection.mutable.HashMap[String,Double]] = Map(gpu -> Map(b -> 2.0, d -> 4.0, a -> 1.0, c -> 3.0)) scala> compareMaps(resources("gpu"), mapped("gpu").toMap) res0: Boolean = false ``` The same code bug got different results for Scala 2.12 and Scala 2.13. This PR tried to rework compareMaps to make tests pass for both scala 2.12 and scala 2.13 Some users may back-port apache#43494 to some older branch for scala 2.12 and will run into the same issue. It's just trivial work to make the GPU fraction tests compatible with Scala 2.12 and Scala 2.13 No Make sure all the CI pipelines pass No Closes apache#44735 from wbo4958/gpu-fraction-tests. Authored-by: Bobby Wang <[email protected]> Signed-off-by: Yi Wu <[email protected]>
| * Double can display up to 16 decimal places, so we set the factor to | ||
| * 10, 000, 000, 000, 000, 000L. | ||
| */ | ||
| final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is way too hacky.
I get it, but, the right solution in your example is to ask for something like 0.111 GPUs to get 9. This works fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @srowen,
Thank you for your review. You've suggested a straightforward solution to address the issue of double precision by actively sacrificing some accuracy, and it's certainly doable. However, this raises a question about the precision that Spark should define. For instance, if a user sets the value to 0.1111111111111, which value should Spark use for the calculation: 0.111, 0.1111, or something else? If we simply select 4 decimal places, what happens if a user wants to set the value to 0.000001? In that case, the value would be converted to 0 and might raise an exception. But It seems that a configuration option could resolve this problem.
My initial thought was that the resource fraction calculation should work regardless of the value set by the user, and the proposed approach in the pull request to convert the double to a Long can achieve this goal.
Nevertheless, if you and other committers believe that a change is necessary, I am open to submitting a pull request for the master branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it also doesn't feel wholly satisfying. In this example all of those values work as 9 times even 0.11 leaves you with less than 0.11 remaining, so you schedule 9. It would also imply there is 0.01 GPU left when that isn't the intent. In practice, I strongly doubt anyone is ever scheduling, let's say, more than 100 tasks on one GPU.
(But what about non-GPU resources? there aren't any now. Are there resources you'd schedule very very small fractions of? I can't think of any even in the future.)
Going down the rabbit hole of floating-point precision, I think we hit that no matter what. If I ask for 1e-16 resources, any way we interpret that probably is slightly imprecise as it's interpreted as float somewhere. But these are unrealistic use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen what if your major concern here? Are you seeing performance issues? just more complex then you think is necessary? i don't think its hacky, it might be overkill but I do agree its likely more then we need for precision. I would go more then 100 though as 100 tasks per some resource seems unlikely but not impossible, 1000 - 10000 much more unlikely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can solve floating-point accuracy here in the general case, and this will virtually never arise anyway, except in one important class of case -- n GPUs where n < 10 and n is relatively prime to 10. Like, 3 even. A person writing down the resource utilization will almost surely write "0.333", but one can imagine supplying str(1./3.) programmatically. And then this issue could arise. Some string like "0.333333333" may end up as a float that has a value just over 1/3.
The other issue is approaching it this way by just multiplying by a long. I guess instead this can be done properly with BigDecimal at least? and I don't think this should touch so many parts of the code. Surely this just affects how the resource request string is parsed and compared to available resources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @srowen,
The using Long way is quite the same with the BigDecimal. The default scale of BigDecimal is 16, so this PR chooses (ONE_ENTIRE_RESOURCE = 1E16.toLong)
scala> val ONE_ENTIRE_RESOURCE: Long = 1E16.toLong
| val taskAmount = 0.3333333333333334
|
| val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong
|
| val bigDec = BigDecimal(taskAmount).toDouble
val ONE_ENTIRE_RESOURCE: Long = 10000000000000000
val taskAmount: Double = 0.3333333333333334
val usingLong: Long = 3333333333333334
val bigDec: Double = 0.3333333333333334So if we need to ensure the input is small enough (<1/n) and we can set the scale to be like 14 for BigDecimal, and similarly, to keep align with BigDecimal, we can set ONE_ENTIRE_RESOURCE = 1E14.toLong
scala> import scala.math.BigDecimal.RoundingMode
|
| val ONE_ENTIRE_RESOURCE: Long = 1E14.toLong
| val taskAmount = 0.3333333333333334
|
| val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong
|
| val bigDec = BigDecimal(taskAmount).setScale(14, RoundingMode.DOWN).toDouble
import scala.math.BigDecimal.RoundingMode
val ONE_ENTIRE_RESOURCE: Long = 100000000000000
val taskAmount: Double = 0.3333333333333334
val usingLong: Long = 33333333333333
val bigDec: Double = 0.33333333333333There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. The point of Big decimal is that you never have to use double or float. This isn't actually doing math with BD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @srowen. Actually, this PR converts the taskAmount which is a double/float value to a Long by multiplying 1E16.toLong, and then the following calculation is based on Long instead of double/float. you can see, all the APIs of ExecutorResourcesAmount are using Long. Even the assigned resource of a task is still keeping the Long, you can refer to there.
Yeah, but you think we should use BigDecimal, I'm Okay for that, I can make a PR for master branch first, and then cherry-pick to 3.5 branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know. Above I give an example where multiplying by long doesn't work. I'm referring to your example of BigDecimal above, which does not use (only) BigDecimal. Please just try it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @srowen. Sure, let me have a PR using BD for master branch. Thx
|
I'm not an expert and can't say anything about the code change, but a general comment - is there any specific reason this huge line of change has to be ported back to released version line? This clearly sounds to be a feature not bugfix. As a release manager of Spark 3.5.1, I would like to exclude this change at least for 3.5.1 to fasten the release process with existing bunch of critical/blocker bugfixes. (Committers+, please DO NOT MERGE this PR till 3.5.1 has released.) And thinking out loud, my understanding of policy would be that this shouldn't go to 3.5 version line - at least it needs discussion. |
|
Hi @HeartSaVioR, In essence, it's actually a genuine bug in Spark rather than a feature request. If we attempt to restrict the execution of only one task per executor by modifying the GPU resources, it won't work as expected. This scenario is quite common in ETL (Extract, Transform, Load) + ML (Machine Learning) workflows. |
|
No, he's asking why it's open vs the 3.5 branch not master. It is not going into master and so is not to be back ported either. The main reason is that this is not a fix. |
|
@srowen No, this was merged to master via #43494. I'm just speculating as the change does not look to be trivial enough to worth taking the risk on adopting this change in maintenance version. For me it doesn't seem to be bound to a simple bugfix. Experts on this subject can make the decision, e.g. I won't object whether you or @tgravescs decide to port back. @wbo4958 You can correct me if I'm mistaken, but if you are generally talking about limitation, it is still not technically a bug for me. |
|
Wait, this was merged to master? Let's not merge to 3.5, no, because i don't think this actually fixes the problem (see the thread here - I didn't grok that this was a backport) @tgravescs not sure about this one in master |
|
I'm not sure what you mean, it fixes the issue it was intended to fix - https://issues.apache.org/jira/browse/SPARK-45527 The fact that there are some issues with rounding of doubles shouldn't normally be seen and is not related to this particular change, that was an api thing that was chosen when the feature originally went in a long time ago. |
|
It looks like there are many not-quite-the-same changes related to this JIRA. The change I'm concerned about is in #43494 - the whole trying to do floating-point math with a long thing. That does not fix the particular issue that was claimed, it just moves around the issue. It sounds like you're saying there were other issues that are resolved by this change? that's more reasonable if so, but I still am uncomfortable with the change. That said I haven't thought through all the changes attached to this JIRA. The core issue of floating-point inaccuracy doesn't go away, and then I'm not sure why bother with the long, and that complexity |
|
the description on the pr could be better I also updated the issue to point to the issue that broke this or really didn't fully implement it. essentially https://issues.apache.org/jira/browse/SPARK-39853 added support to change resources with dynamic allocation off. That pr did not handle extra resources like gpu/fpga, etc. This issue is fixing that handling within the constraints of the API and I think is adequate for the majority of use cases. If you have issues with the api then please file a separate issue fully describing what your issues are. |
|
Ok, I think there are then multiple related but different changes going on here. I'm only questioning the handling of floating point part, which doesn't really work. Ideally that would be excised. I'm sure the rest is ok. In any event this shouldn't go into 3.5 |
|
Hi @srowen, I appreciate your review, but I have some confusion regarding your statements. You mentioned that |
|
I'm confused too, didn't we have a long conversation about this? The essence of your fix is this: |
|
Hi @srowen, For #44690 (comment) I deployed a spark standalone cluster, launched a spark-shell and ran the test code by spark-shell --master spark://192.168.0.106:7077 --conf spark.executor.cores=20 --conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 --conf spark.task.resource.gpu.amount=0.05 \
--conf spark.dynamicAllocation.enabled=false24/02/28 11:08:10 WARN Utils: Your hostname, xxx resolves to a loopback address: 127.0.1.1; using 192.168.0.106 instead (on interface wlp0s20f3)
24/02/28 11:08:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT
/_/
Using Scala version 2.13.12 (OpenJDK 64-Bit Server VM, Java 17.0.9)
Type in expressions to have them evaluated.
Type :help for more information.
24/02/28 11:08:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://192.168.0.106:4040
Spark context available as 'sc' (master = spark://192.168.0.106:7077, app id = app-20240228110813-0000).
Spark session available as 'spark'.
scala> import org.apache.spark.TaskContext
| import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
|
| val rdd = sc.range(0, 100, 1, 20).mapPartitions { iter => {
| val tc = TaskContext.get()
| val tid = tc.partitionId()
| assert(tc.resources()("gpu").addresses sameElements Array("0"))
| iter
| }}
|
| val rdd1 = rdd.repartition(16)
| val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0/11.0)
| val rp = new ResourceProfileBuilder().require(treqs).build
| val rdd2 = rdd1.withResources(rp).mapPartitions { iter => {
| val tc = TaskContext.get()
| val tid = tc.partitionId()
| assert(tc.resources()("gpu").addresses sameElements Array("0"))
| iter
| }
| }
| rdd2.collect()
warning: 1 deprecation (since 2.13.3); for details, enable `:setting -deprecation` or `:replay -deprecation`
24/02/28 11:09:36 WARN ResourceUtils: The configuration of cores (exec = 20 task = 1, runnable tasks = 20) will result in wasted resources due to resource gpu limiting the number of runnable tasks per executor to: 11. Please adjust your configuration.
import org.apache.spark.TaskContext
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[2] at mapPartitions at <console>:4
val rdd1: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[6] at repartition at <console>:11
val treqs: org.apache.spark.resource.TaskResourceRequests = Task resource requests: {cpus=name: cpus, amount: 1.0, gpu=name: gpu, amount: 0.09090909090909091}
val rp: org.apache.spark.resource.ResourceProfile = Profile: id = 1, executor resources: , task resources: cpus -> name: cpus, amount: 1.0,gpu -> name: gpu, amount: 0.09090909090909091
val rdd2: org.apache.sp...As you can see, I use the 1.0/11.0 for GPU resource
Here are the stages, The first stage, a total of 20 tasks run at the same time The second stage, a total of 11 tasks run at the same time It worked as expected |
|
So, this doesn't work: But I think what you're doing is converting If that's the explanation, then I think that's pretty fine actually. But the comments suggest that somehow you avoid precision loss; in fact you're getting more precision loss, just erring in a direction that works because you are under-allocating by a tiiiiiny amount that won't matter in any real world scenario |
|
Hi @srowen, Sorry for my bad example in #44690 (comment), just like you said, the double has been converted to Long by multiplying scala> val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L
| val taskAmount = ((1.0/11.0) * ONE_ENTIRE_RESOURCE).toLong
| var total = ONE_ENTIRE_RESOURCE
| for (i <- 1 to 11 ) {
| if (total >= taskAmount) {
| total -= taskAmount
| println(s"assign $taskAmount for task $i, total left: ${total.toDouble / ONE_ENTIRE_RESOURCE}")
| } else {
| println(s"ERROR Can't assign $taskAmount for task $i, total left: ${total.toDouble / ONE_ENTIRE_RESOURCE}")
| }
| }
assign 909090909090909 for task 1, total left: 0.9090909090909092
assign 909090909090909 for task 2, total left: 0.8181818181818182
assign 909090909090909 for task 3, total left: 0.7272727272727273
assign 909090909090909 for task 4, total left: 0.6363636363636364
assign 909090909090909 for task 5, total left: 0.5454545454545455
assign 909090909090909 for task 6, total left: 0.4545454545454546
assign 909090909090909 for task 7, total left: 0.3636363636363637
assign 909090909090909 for task 8, total left: 0.2727272727272728
assign 909090909090909 for task 9, total left: 0.1818181818181819
assign 909090909090909 for task 10, total left: 0.090909090909091
assign 909090909090909 for task 11, total left: 1.0E-16
val ONE_ENTIRE_RESOURCE: Long = 10000000000000000
val taskAmount: Long = 909090909090909
var total: Long = 1This PR has already converted the double to Long when doing the GPU resource calculation, so the internal calcuation is totally based on Long instead of double anymore, please refer to here |
Round down scenarioscala> val ONE_ENTIRE_RESOURCE: Long = 100000000000000000L
val ONE_ENTIRE_RESOURCE: Long = 100000000000000000
scala> val ONE_ENTIRE_RESOURCE: Long = (1E16).toLong
val ONE_ENTIRE_RESOURCE: Long = 10000000000000000
scala> val taskAmount = 0.1000000000000123456789
val taskAmount: Double = 0.10000000000001234
scala> (taskAmount * ONE_ENTIRE_RESOURCE).toLong
val res11: Long = 1000000000000123
No round downscala> val ONE_ENTIRE_RESOURCE: Long = (1E17).toLong
val ONE_ENTIRE_RESOURCE: Long = 100000000000000000
scala> val taskAmount = 0.1000000000000123456789
val taskAmount: Double = 0.10000000000001234
scala> (taskAmount * ONE_ENTIRE_RESOURCE).toLong
val res13: Long = 10000000000001234
|
|
float -> integer conversion in the JVM always truncates so yes (for positive numbers) you are rounding down by doing this. I think my point is, the fix actually has nothing to do with working in integers; it is the 'rounding down' by 1 ulp to make sure that (for example) 1.0/11.0 is <= 1/11th. I think this would have worked as well if you shaved 1 ulp off the inputs, for instance. It may not be worth pursuing this any further though. |
|
I know if we directly convert float to integer, there will be something like round down, but the fact is we have multiplied a Big number, so I think there is no round down anymore, please correct me if I'm wrong by giving a concrete sample. Thx. Just like I said before, why we should care about the real value of (1.0/11.0), the value is coming from the user end, if it's a little bigger than the real (1.0/11.0), what spark can only do is only allow 10 tasks running at the same time instead of 11. Spark can't do anything about this value (1.0/11) passed from the user. It's the user end's precision error instead of the spark itself. |
|
Well, long * double is always carried out in double precision. Casting it to long doesn't make the math somehow exact. You will always have some truncation when casting to an integer. I actually can't think of a case where a double is exactly equal to an integer except for powers of two. (When printing a double, it will appear to be equal to an integer and end in ".0" if the double is within 1 ulp of an integer, but it isn't quite) I agree on the issue, I'm saying that simply subtracting 1 ulp (= the smallest double value) would have done the same thing; the longs are a roundabout way of achieving that that I think is misleadingly represented in the comments. |
|
"simply subtracting 1 ulp" may cause the precision error to be accumulated when doing the double calculation each time, and may finally result in unexpected behavior. Anyway, "simply subtracting 1 ulp" may be another kind of doable way, but just as you said, "It may not be worth pursuing this any further though." So, let's get back to this PR itself, is it ok to be merged into spark 3.5 branch? |
|
This is how your current change works though, by rounding down by (more than) 1 ulp. Yes, error accumulates, and yes you still end up with the 'wrong' total resource left (it is positive when should be 0) but in any practical case, it does not cause the wrong scheduling. As per other comments, no this should not go into 3.5. |















What changes were proposed in this pull request?
This (PR) introduces the utilization of fractions instead of slots, which is similar to the CPU strategy,
for determining whether a worker offer can provide the necessary resources to tasks.
For instance, when an executor reports to the driver with [gpu, ["1,", "2"]], the driver constructs an executor data map.
The keys in the map represent the GPU addresses, and their default values are set to 1.0, indicating one whole GPU.
Consequently, the available resource amounts for the executor are as follows: { "1" -> 1.0f, "2" -> 1.0f }.
When offering resources to a task that requires 1 CPU and 0.08 GPU, the worker offer examines the available resource amounts.
It identifies that the capacity of GPU address "1.0" is greater than the task's GPU requirement (1.0 >= 0.08).
Therefore, Spark assigns the GPU address "1" to this task. After the assignment, the available resource amounts
for this executor are updated to { "1" -> 0.92, "2" -> 1.0}, ensuring that the remaining resources can be allocated to other tasks.
In scenarios where other tasks, using different task resource profiles, request varying GPU amounts
when dynamic allocation is disabled, Spark applies the same comparison approach. It compares the task's GPU requirement with
the available resource amounts to determine if the resources can be assigned to the task.
Why are the changes needed?
The existing resources offering including gpu, fpga is based on "slots per address", which is defined by the default resource profile.
and it's a fixed number for all different resource profiles when dynamic allcation is disabled.
Consider the below test case,
withTempDir { dir => val scriptPath = createTempScriptWithExpectedOutput(dir, "gpuDiscoveryScript", """{"name": "gpu","addresses":["0"]}""") val conf = new SparkConf() .setAppName("test") .setMaster("local-cluster[1, 12, 1024]") .set("spark.executor.cores", "12") conf.set("spark.worker.resource.gpu.amount", "1") conf.set("spark.worker.resource.gpu.discoveryScript", scriptPath) conf.set("spark.executor.resource.gpu.amount", "1") conf.set("spark.task.resource.gpu.amount", "0.08") sc = new SparkContext(conf) val rdd = sc.range(0, 100, 1, 4) var rdd1 = rdd.repartition(3) val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0) val rp = new ResourceProfileBuilder().require(treqs).build rdd1 = rdd1.withResources(rp) assert(rdd1.collect().size === 100) }During the initial stages, Spark generates a default resource profile based on the configurations. The calculation
for determining the slots per GPU address is performed as "spark.executor.resource.gpu.amount / spark.task.resource.gpu.amount",
resulting in a value of 12 (1/0.08 = 12). This means that Spark can accommodate up to 12 tasks running on each GPU address simultaneously.
The job is then divided into two stages. The first stage, which consists of 4 tasks, runs concurrently based on
the default resource profile. However, the second stage, comprising 3 tasks, runs sequentially using a new task
resource profile. This new profile specifies that each task requires 1 CPU and 1.0 full GPU.
In reality, the tasks in the second stage are running in parallel, which is the underlying issue.
The problem lies in the line
new TaskResourceRequests().cpus(1).resource("gpu", 1.0). The value of 1.0for the GPU, or any value below 1.0 (specifically, (0, 0.5] which is rounded up to 1.0, spark throws an exception if the value is in (0.5, 1)),
is merely requesting the number of slots. In this case, it is requesting only 1 slot. Consequently, each task
necessitates 1 CPU core and 1 GPU slot, resulting in all tasks running simultaneously.
Does this PR introduce any user-facing change?
No
How was this patch tested?
To ensure all tests got passed
Was this patch authored or co-authored using generative AI tooling?
No