Skip to content

Commit e35be2b

Browse files
ruanwenjunruanwenjun
authored andcommitted
[SPARK-54553][VOLCANO] Supports receiving podgroup JSON format configurations when using Volcano.
1 parent a1e6285 commit e35be2b

File tree

3 files changed

+89
-13
lines changed

3 files changed

+89
-13
lines changed

docs/running-on-kubernetes.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,9 +1954,7 @@ Note that currently only driver/job level PodGroup is supported in Volcano Featu
19541954
Volcano defines PodGroup spec using [CRD yaml](https://volcano.sh/en/docs/podgroup/#example).
19551955

19561956
Similar to [Pod template](#pod-template), Spark users can use Volcano PodGroup Template to define the PodGroup spec configurations.
1957-
To do so, specify the Spark property `spark.kubernetes.scheduler.volcano.podGroupTemplateFile` to point to files accessible to the `spark-submit` process.
19581957
Below is an example of PodGroup template:
1959-
19601958
```yaml
19611959
apiVersion: scheduling.volcano.sh/v1beta1
19621960
kind: PodGroup
@@ -1975,6 +1973,17 @@ spec:
19751973
queue: default
19761974
```
19771975
1976+
You have two options to provide the PodGroup template in spark.
1977+
1. Use `spark.kubernetes.scheduler.volcano.podGroupTemplateFile` to point to files accessible to the `spark-submit` process
1978+
```bash
1979+
--conf spark.kubernetes.scheduler.volcano.podGroupTemplateFile=/path/to/podgroup
1980+
```
1981+
1982+
2. Use `spark.kubernetes.scheduler.volcano.podGroupTemplateJson` to provide the template in json format.
1983+
```bash
1984+
--conf spark.kubernetes.scheduler.volcano.podGroupTemplateJson={"spec": {"minMember": 1,"minResources": {"cpu": "2","memory": "3Gi"},"priorityClassName": "system-node-critical","queue": "default"}}
1985+
```
1986+
19781987
#### Using Apache YuniKorn as Customized Scheduler for Spark on Kubernetes
19791988

19801989
[Apache YuniKorn](https://yunikorn.apache.org/) is a resource scheduler for Kubernetes that provides advanced batch scheduling

resource-managers/kubernetes/core/volcano/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,20 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.features
1818

19+
import com.fasterxml.jackson.databind.ObjectMapper
20+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
1921
import io.fabric8.kubernetes.api.model._
2022
import io.fabric8.volcano.api.model.scheduling.v1beta1.{PodGroup, PodGroupSpec}
2123
import io.fabric8.volcano.client.DefaultVolcanoClient
2224

25+
import org.apache.spark.SparkException
2326
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
2427
import org.apache.spark.internal.Logging
2528

26-
private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
27-
with KubernetesExecutorCustomFeatureConfigStep with Logging {
29+
private[spark] class VolcanoFeatureStep
30+
extends KubernetesDriverCustomFeatureConfigStep
31+
with KubernetesExecutorCustomFeatureConfigStep
32+
with Logging {
2833
import VolcanoFeatureStep._
2934

3035
private var kubernetesConf: KubernetesConf = _
@@ -42,13 +47,15 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
4247

4348
override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
4449
if (kubernetesConf.isInstanceOf[KubernetesExecutorConf]) {
45-
logWarning("VolcanoFeatureStep#getAdditionalPreKubernetesResources() is not supported " +
46-
"for executor.")
50+
logWarning(
51+
"VolcanoFeatureStep#getAdditionalPreKubernetesResources() is not supported " +
52+
"for executor.")
4753
return Seq.empty
4854
}
4955
lazy val client = new DefaultVolcanoClient
50-
val template = kubernetesConf.getOption(POD_GROUP_TEMPLATE_FILE_KEY)
51-
val pg = template.map(client.podGroups.load(_).item).getOrElse(new PodGroup())
56+
val pg = getPodGroupConfigByTemplateFile(client)
57+
.orElse(getPodGroupByTemplateJson(client))
58+
.getOrElse(new PodGroup())
5259
var metadata = pg.getMetadata
5360
if (metadata == null) metadata = new ObjectMeta
5461
metadata.setName(podGroupName)
@@ -59,13 +66,42 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
5966
if (spec == null) spec = new PodGroupSpec
6067
pg.setSpec(spec)
6168

69+
logDebug(s"Volcano PodGroup configuration: $pg")
6270
Seq(pg)
6371
}
6472

73+
private def getPodGroupConfigByTemplateFile(
74+
volcanoClient: DefaultVolcanoClient): Option[PodGroup] = {
75+
kubernetesConf.getOption(POD_GROUP_TEMPLATE_FILE_KEY).map { templateFile =>
76+
logDebug("Loading Volcano PodGroup configuration from template file")
77+
volcanoClient.podGroups.load(templateFile).item
78+
}
79+
}
80+
81+
private def getPodGroupByTemplateJson(volcanoClient: DefaultVolcanoClient): Option[PodGroup] = {
82+
kubernetesConf.getOption(POD_GROUP_TEMPLATE_JSON_KEY).map { templateJson =>
83+
logDebug("Loading Volcano PodGroup configuration from template json")
84+
try {
85+
val templateYaml = new ObjectMapper()
86+
.writerWithDefaultPrettyPrinter()
87+
.writeValueAsString(new ObjectMapper(new YAMLFactory()).readTree(templateJson))
88+
volcanoClient
89+
.podGroups()
90+
.load(new java.io.ByteArrayInputStream(templateYaml.getBytes()))
91+
.item()
92+
} catch {
93+
case e: Exception =>
94+
throw new SparkException(
95+
f"The ${POD_GROUP_TEMPLATE_JSON_KEY} provided is not validated",
96+
e)
97+
}
98+
}
99+
}
100+
65101
override def configurePod(pod: SparkPod): SparkPod = {
66102
val k8sPodBuilder = new PodBuilder(pod.pod)
67103
.editMetadata()
68-
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
104+
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
69105
.endMetadata()
70106
val k8sPod = k8sPodBuilder.build()
71107
SparkPod(k8sPod, pod.container)
@@ -75,4 +111,5 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
75111
private[spark] object VolcanoFeatureStep {
76112
val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
77113
val POD_GROUP_TEMPLATE_FILE_KEY = "spark.kubernetes.scheduler.volcano.podGroupTemplateFile"
114+
val POD_GROUP_TEMPLATE_JSON_KEY = "spark.kubernetes.scheduler.volcano.podGroupTemplateJson"
78115
}

resource-managers/kubernetes/core/volcano/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
3434

3535
val annotations = configuredPod.pod.getMetadata.getAnnotations
3636

37-
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
37+
assert(
38+
annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
3839
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
3940
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
4041
}
@@ -46,12 +47,13 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
4647
step.init(kubernetesConf)
4748
val configuredPod = step.configurePod(SparkPod.initialPod())
4849
val annotations = configuredPod.pod.getMetadata.getAnnotations
49-
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
50+
assert(
51+
annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
5052
}
5153

5254
test("SPARK-38455: Support driver podgroup template") {
53-
val templatePath = new File(
54-
getClass.getResource("/driver-podgroup-template.yml").getFile).getAbsolutePath
55+
val templatePath =
56+
new File(getClass.getResource("/driver-podgroup-template.yml").getFile).getAbsolutePath
5557
val sparkConf = new SparkConf()
5658
.set(VolcanoFeatureStep.POD_GROUP_TEMPLATE_FILE_KEY, templatePath)
5759
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
@@ -67,6 +69,34 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
6769
assert(podGroup.getSpec.getQueue == "driver-queue")
6870
}
6971

72+
test("SPARK-38455: Support create driver podgroup by attributes") {
73+
val sparkConf = new SparkConf()
74+
.set(
75+
VolcanoFeatureStep.POD_GROUP_TEMPLATE_JSON_KEY,
76+
"{\n" +
77+
" \"spec\": {\n" +
78+
" \"minMember\": 1,\n" +
79+
" \"minResources\": {\n" +
80+
" \"cpu\": \"2\",\n" +
81+
" \"memory\": \"2048Mi\"\n" +
82+
" },\n" +
83+
" \"priorityClassName\": \"driver-priority\",\n" +
84+
" \"queue\": \"driver-queue\"\n" +
85+
" }\n" +
86+
"}")
87+
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
88+
val step = new VolcanoFeatureStep()
89+
step.init(kubernetesConf)
90+
step.configurePod(SparkPod.initialPod())
91+
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
92+
assert(podGroup.getSpec.getMinMember == 1)
93+
assert(podGroup.getSpec.getMinResources.get("cpu").getAmount == "2")
94+
assert(podGroup.getSpec.getMinResources.get("memory").getAmount == "2048")
95+
assert(podGroup.getSpec.getMinResources.get("memory").getFormat == "Mi")
96+
assert(podGroup.getSpec.getPriorityClassName == "driver-priority")
97+
assert(podGroup.getSpec.getQueue == "driver-queue")
98+
}
99+
70100
test("SPARK-38503: return empty for executor pre resource") {
71101
val kubernetesConf = KubernetesTestConf.createExecutorConf(new SparkConf())
72102
val step = new VolcanoFeatureStep()

0 commit comments

Comments
 (0)