Skip to content

Commit a3ae7f6

Browse files
ruanwenjunruanwenjun
authored andcommitted
[SPARK-54553][VOLCANO] Supports receiving podgroup JSON format configurations when using Volcano.
1 parent a1e6285 commit a3ae7f6

File tree

3 files changed

+89
-14
lines changed

3 files changed

+89
-14
lines changed

docs/running-on-kubernetes.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1954,9 +1954,7 @@ Note that currently only driver/job level PodGroup is supported in Volcano Featu
19541954
Volcano defines PodGroup spec using [CRD yaml](https://volcano.sh/en/docs/podgroup/#example).
19551955

19561956
Similar to [Pod template](#pod-template), Spark users can use Volcano PodGroup Template to define the PodGroup spec configurations.
1957-
To do so, specify the Spark property `spark.kubernetes.scheduler.volcano.podGroupTemplateFile` to point to files accessible to the `spark-submit` process.
19581957
Below is an example of PodGroup template:
1959-
19601958
```yaml
19611959
apiVersion: scheduling.volcano.sh/v1beta1
19621960
kind: PodGroup
@@ -1975,6 +1973,17 @@ spec:
19751973
queue: default
19761974
```
19771975
1976+
You have two options to provide the PodGroup template in spark.
1977+
1. Use `spark.kubernetes.scheduler.volcano.podGroupTemplateFile` to point to files accessible to the `spark-submit` process
1978+
```bash
1979+
--conf spark.kubernetes.scheduler.volcano.podGroupTemplateFile=/path/to/podgroup
1980+
```
1981+
1982+
2. Use `spark.kubernetes.scheduler.volcano.podGroupTemplateJson` to provide the template in json format.
1983+
```bash
1984+
--conf spark.kubernetes.scheduler.volcano.podGroupTemplateJson={"spec": {"minMember": 1,"minResources": {"cpu": "2","memory": "3Gi"},"priorityClassName": "system-node-critical","queue": "default"}}
1985+
```
1986+
19781987
#### Using Apache YuniKorn as Customized Scheduler for Spark on Kubernetes
19791988

19801989
[Apache YuniKorn](https://yunikorn.apache.org/) is a resource scheduler for Kubernetes that provides advanced batch scheduling

resource-managers/kubernetes/core/volcano/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,19 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.features
1818

19+
import com.fasterxml.jackson.databind.ObjectMapper
20+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
1921
import io.fabric8.kubernetes.api.model._
2022
import io.fabric8.volcano.api.model.scheduling.v1beta1.{PodGroup, PodGroupSpec}
2123
import io.fabric8.volcano.client.DefaultVolcanoClient
22-
24+
import org.apache.spark.SparkException
2325
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
2426
import org.apache.spark.internal.Logging
2527

26-
private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
27-
with KubernetesExecutorCustomFeatureConfigStep with Logging {
28+
private[spark] class VolcanoFeatureStep
29+
extends KubernetesDriverCustomFeatureConfigStep
30+
with KubernetesExecutorCustomFeatureConfigStep
31+
with Logging {
2832
import VolcanoFeatureStep._
2933

3034
private var kubernetesConf: KubernetesConf = _
@@ -42,13 +46,15 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
4246

4347
override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
4448
if (kubernetesConf.isInstanceOf[KubernetesExecutorConf]) {
45-
logWarning("VolcanoFeatureStep#getAdditionalPreKubernetesResources() is not supported " +
46-
"for executor.")
49+
logWarning(
50+
"VolcanoFeatureStep#getAdditionalPreKubernetesResources() is not supported " +
51+
"for executor.")
4752
return Seq.empty
4853
}
4954
lazy val client = new DefaultVolcanoClient
50-
val template = kubernetesConf.getOption(POD_GROUP_TEMPLATE_FILE_KEY)
51-
val pg = template.map(client.podGroups.load(_).item).getOrElse(new PodGroup())
55+
val pg = getPodGroupConfigByTemplateFile(client)
56+
.orElse(getPodGroupByTemplateJson(client))
57+
.getOrElse(new PodGroup())
5258
var metadata = pg.getMetadata
5359
if (metadata == null) metadata = new ObjectMeta
5460
metadata.setName(podGroupName)
@@ -59,13 +65,42 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
5965
if (spec == null) spec = new PodGroupSpec
6066
pg.setSpec(spec)
6167

68+
logDebug(s"Volcano PodGroup configuration: $pg")
6269
Seq(pg)
6370
}
6471

72+
private def getPodGroupConfigByTemplateFile(
73+
volcanoClient: DefaultVolcanoClient): Option[PodGroup] = {
74+
kubernetesConf.getOption(POD_GROUP_TEMPLATE_FILE_KEY).map { templateFile =>
75+
logDebug("Loading Volcano PodGroup configuration from template file")
76+
volcanoClient.podGroups.load(templateFile).item
77+
}
78+
}
79+
80+
private def getPodGroupByTemplateJson(volcanoClient: DefaultVolcanoClient): Option[PodGroup] = {
81+
kubernetesConf.getOption(POD_GROUP_TEMPLATE_JSON_KEY).map { templateJson =>
82+
logDebug("Loading Volcano PodGroup configuration from template json")
83+
try {
84+
val templateYaml = new ObjectMapper()
85+
.writerWithDefaultPrettyPrinter()
86+
.writeValueAsString(new ObjectMapper(new YAMLFactory()).readTree(templateJson))
87+
volcanoClient
88+
.podGroups()
89+
.load(new java.io.ByteArrayInputStream(templateYaml.getBytes()))
90+
.item()
91+
} catch {
92+
case e: Exception =>
93+
throw new SparkException(
94+
f"The ${POD_GROUP_TEMPLATE_JSON_KEY} provided is not validated",
95+
e)
96+
}
97+
}
98+
}
99+
65100
override def configurePod(pod: SparkPod): SparkPod = {
66101
val k8sPodBuilder = new PodBuilder(pod.pod)
67102
.editMetadata()
68-
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
103+
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
69104
.endMetadata()
70105
val k8sPod = k8sPodBuilder.build()
71106
SparkPod(k8sPod, pod.container)
@@ -75,4 +110,5 @@ private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureCon
75110
private[spark] object VolcanoFeatureStep {
76111
val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
77112
val POD_GROUP_TEMPLATE_FILE_KEY = "spark.kubernetes.scheduler.volcano.podGroupTemplateFile"
113+
val POD_GROUP_TEMPLATE_JSON_KEY = "spark.kubernetes.scheduler.volcano.podGroupTemplateJson"
78114
}

resource-managers/kubernetes/core/volcano/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
3434

3535
val annotations = configuredPod.pod.getMetadata.getAnnotations
3636

37-
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
37+
assert(
38+
annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
3839
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
3940
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
4041
}
@@ -46,12 +47,13 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
4647
step.init(kubernetesConf)
4748
val configuredPod = step.configurePod(SparkPod.initialPod())
4849
val annotations = configuredPod.pod.getMetadata.getAnnotations
49-
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
50+
assert(
51+
annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
5052
}
5153

5254
test("SPARK-38455: Support driver podgroup template") {
53-
val templatePath = new File(
54-
getClass.getResource("/driver-podgroup-template.yml").getFile).getAbsolutePath
55+
val templatePath =
56+
new File(getClass.getResource("/driver-podgroup-template.yml").getFile).getAbsolutePath
5557
val sparkConf = new SparkConf()
5658
.set(VolcanoFeatureStep.POD_GROUP_TEMPLATE_FILE_KEY, templatePath)
5759
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
@@ -67,6 +69,34 @@ class VolcanoFeatureStepSuite extends SparkFunSuite {
6769
assert(podGroup.getSpec.getQueue == "driver-queue")
6870
}
6971

72+
test("SPARK-38455: Support create driver podgroup by attributes") {
73+
val sparkConf = new SparkConf()
74+
.set(
75+
VolcanoFeatureStep.POD_GROUP_TEMPLATE_JSON_KEY,
76+
"{\n" +
77+
" \"spec\": {\n" +
78+
" \"minMember\": 1,\n" +
79+
" \"minResources\": {\n" +
80+
" \"cpu\": \"2\",\n" +
81+
" \"memory\": \"2048Mi\"\n" +
82+
" },\n" +
83+
" \"priorityClassName\": \"driver-priority\",\n" +
84+
" \"queue\": \"driver-queue\"\n" +
85+
" }\n" +
86+
"}")
87+
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
88+
val step = new VolcanoFeatureStep()
89+
step.init(kubernetesConf)
90+
step.configurePod(SparkPod.initialPod())
91+
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
92+
assert(podGroup.getSpec.getMinMember == 1)
93+
assert(podGroup.getSpec.getMinResources.get("cpu").getAmount == "2")
94+
assert(podGroup.getSpec.getMinResources.get("memory").getAmount == "2048")
95+
assert(podGroup.getSpec.getMinResources.get("memory").getFormat == "Mi")
96+
assert(podGroup.getSpec.getPriorityClassName == "driver-priority")
97+
assert(podGroup.getSpec.getQueue == "driver-queue")
98+
}
99+
70100
test("SPARK-38503: return empty for executor pre resource") {
71101
val kubernetesConf = KubernetesTestConf.createExecutorConf(new SparkConf())
72102
val step = new VolcanoFeatureStep()

0 commit comments

Comments
 (0)