Skip to content

Commit 5984614

Browse files
Yikundongjoon-hyun
authored andcommitted
[SPARK-36061][K8S] Add volcano module and feature step
### What changes were proposed in this pull request? This patch added volcano feature step to help user integrate spark with Volcano Scheduler. - Add a VolcanoFeatureStep, it can be used in driver and executor side. After this patch, users can enable this featurestep by submiting job by using ```shell --conf spark.kubernete.driver.scheduler.name=volcano \ --conf spark.kubernetes.driver.pod.featureSteps=org.apache.spark.deploy.k8s.features.scheduler.VolcanoFeatureStep ``` A PodGroup will be created before driver started, annotations will be set to driver pod to added driver pod to this pod group. Then, Volcano scheduler will help driver pod scheduling instead of deafult kubernetes scheduler. ### Why are the changes needed? This PR help user integrate Spark with Volcano Scheduler. See also: [SPARK-36057](https://issues.apache.org/jira/browse/SPARK-36057) ### Does this PR introduce _any_ user-facing change? Yes, introduced a user feature step. These are used by `VolcanoFeatureStep`, and also will be used by `YunikornFeatureStep` in future. ### How was this patch tested? - UT - Integration test: Test without -Pvolcano (make sure exsiting integration test passed) ```bash # 1. Test without -Pvolcano (make sure exsiting integration test passed) # SBT build/sbt -Pkubernetes -Pkubernetes-integration-tests -Dtest.exclude.tags=minikube,r "kubernetes-integration-tests/test" # Maven resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --exclude-tags minikube,r ``` - Integration test: Test all VolcanoSuite (all kubernetes test with volcano + a new podgroup test) and KubernetesSuite ```bash # Deploy Volcano (x86) kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development.yaml # Deploy Volcano (arm64) kubectl apply -f https://raw.githubusercontent.com/volcano-sh/volcano/master/installer/volcano-development-arm64.yaml # Test all VolcanoSuite (all kubernetes test with volcano + a new podgroup test) and KubernetesSuite build/sbt -Pvolcano -Pkubernetes -Pkubernetes-integration-tests -Dtest.exclude.tags=minikube,r "kubernetes-integration-tests/test" ``` Closes #35422 from Yikun/SPARK-36061-vc-step. Authored-by: Yikun Jiang <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c410408 commit 5984614

File tree

10 files changed

+295
-4
lines changed

10 files changed

+295
-4
lines changed

.github/workflows/build_and_test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ jobs:
614614
export MAVEN_CLI_OPTS="--no-transfer-progress"
615615
export JAVA_VERSION=${{ matrix.java }}
616616
# It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414.
617-
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
617+
./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
618618
rm -rf ~/.m2/repository/org/apache/spark
619619
620620
scala-213:
@@ -660,7 +660,7 @@ jobs:
660660
- name: Build with SBT
661661
run: |
662662
./dev/change-scala-version.sh 2.13
663-
./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile
663+
./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile
664664
665665
tpcds-1g:
666666
needs: [configure-jobs, precondition]

dev/scalastyle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# limitations under the License.
1818
#
1919

20-
SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"}
20+
SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano"}
2121

2222
# NOTE: echo "q" is needed because SBT prompts the user for input on encountering a build file
2323
# with failure (either resolution or compilation); the "q" makes SBT quit.

project/SparkBuild.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,11 @@ object SparkBuild extends PomBuild {
421421
// SPARK-14738 - Remove docker tests from main Spark build
422422
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
423423

424+
if (!profiles.contains("volcano")) {
425+
enable(Volcano.settings)(kubernetes)
426+
enable(Volcano.settings)(kubernetesIntegrationTests)
427+
}
428+
424429
enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)
425430

426431
enable(YARN.settings)(yarn)
@@ -956,6 +961,13 @@ object SparkR {
956961
)
957962
}
958963

964+
object Volcano {
965+
// Exclude all volcano file for Compile and Test
966+
lazy val settings = Seq(
967+
unmanagedSources / excludeFilter := HiddenFileFilter || "*Volcano*.scala"
968+
)
969+
}
970+
959971
object Unidoc {
960972

961973
import BuildCommons._

resource-managers/kubernetes/core/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,25 @@
2929
<name>Spark Project Kubernetes</name>
3030
<properties>
3131
<sbt.project.name>kubernetes</sbt.project.name>
32+
<volcano.exclude>**/*Volcano*.scala</volcano.exclude>
3233
</properties>
3334

35+
<profiles>
36+
<profile>
37+
<id>volcano</id>
38+
<properties>
39+
<volcano.exclude></volcano.exclude>
40+
</properties>
41+
<dependencies>
42+
<dependency>
43+
<groupId>io.fabric8</groupId>
44+
<artifactId>volcano-model-v1beta1</artifactId>
45+
<version>${kubernetes-client.version}</version>
46+
</dependency>
47+
</dependencies>
48+
</profile>
49+
</profiles>
50+
3451
<dependencies>
3552
<dependency>
3653
<groupId>org.apache.spark</groupId>
@@ -103,6 +120,19 @@
103120

104121

105122
<build>
123+
<pluginManagement>
124+
<plugins>
125+
<plugin>
126+
<groupId>net.alchim31.maven</groupId>
127+
<artifactId>scala-maven-plugin</artifactId>
128+
<configuration>
129+
<excludes>
130+
<exclude>${volcano.exclude}</exclude>
131+
</excludes>
132+
</configuration>
133+
</plugin>
134+
</plugins>
135+
</pluginManagement>
106136
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
107137
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
108138
</build>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import io.fabric8.kubernetes.api.model._
20+
import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
21+
22+
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
23+
24+
private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
25+
with KubernetesExecutorCustomFeatureConfigStep {
26+
27+
private var kubernetesConf: KubernetesConf = _
28+
29+
private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
30+
31+
private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
32+
private lazy val namespace = kubernetesConf.namespace
33+
34+
override def init(config: KubernetesDriverConf): Unit = {
35+
kubernetesConf = config
36+
}
37+
38+
override def init(config: KubernetesExecutorConf): Unit = {
39+
kubernetesConf = config
40+
}
41+
42+
override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
43+
val podGroup = new PodGroupBuilder()
44+
.editOrNewMetadata()
45+
.withName(podGroupName)
46+
.withNamespace(namespace)
47+
.endMetadata()
48+
.build()
49+
Seq(podGroup)
50+
}
51+
52+
override def configurePod(pod: SparkPod): SparkPod = {
53+
val k8sPodBuilder = new PodBuilder(pod.pod)
54+
.editMetadata()
55+
.addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
56+
.endMetadata()
57+
val k8sPod = k8sPodBuilder.build()
58+
SparkPod(k8sPod, pod.container)
59+
}
60+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.features
18+
19+
import io.fabric8.volcano.scheduling.v1beta1.PodGroup
20+
21+
import org.apache.spark.{SparkConf, SparkFunSuite}
22+
import org.apache.spark.deploy.k8s._
23+
24+
class VolcanoFeatureStepSuite extends SparkFunSuite {
25+
26+
test("SPARK-36061: Driver Pod with Volcano PodGroup") {
27+
val sparkConf = new SparkConf()
28+
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
29+
val step = new VolcanoFeatureStep()
30+
step.init(kubernetesConf)
31+
val configuredPod = step.configurePod(SparkPod.initialPod())
32+
33+
val annotations = configuredPod.pod.getMetadata.getAnnotations
34+
35+
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
36+
val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
37+
assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
38+
}
39+
40+
test("SPARK-36061: Executor Pod with Volcano PodGroup") {
41+
val sparkConf = new SparkConf()
42+
val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
43+
val step = new VolcanoFeatureStep()
44+
step.init(kubernetesConf)
45+
val configuredPod = step.configurePod(SparkPod.initialPod())
46+
val annotations = configuredPod.pod.getMetadata.getAnnotations
47+
assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
48+
}
49+
}

resource-managers/kubernetes/integration-tests/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747

4848
<test.exclude.tags></test.exclude.tags>
4949
<test.include.tags></test.include.tags>
50+
<volcano.exclude>**/*Volcano*.scala</volcano.exclude>
5051
</properties>
5152
<packaging>jar</packaging>
5253
<name>Spark Project Kubernetes Integration Tests</name>
@@ -74,9 +75,28 @@
7475
<artifactId>spark-tags_${scala.binary.version}</artifactId>
7576
<type>test-jar</type>
7677
</dependency>
78+
<dependency>
79+
<groupId>org.apache.spark</groupId>
80+
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
81+
<version>${project.version}</version>
82+
<scope>test</scope>
83+
</dependency>
7784
</dependencies>
7885

7986
<build>
87+
<pluginManagement>
88+
<plugins>
89+
<plugin>
90+
<groupId>net.alchim31.maven</groupId>
91+
<artifactId>scala-maven-plugin</artifactId>
92+
<configuration>
93+
<excludes>
94+
<exclude>${volcano.exclude}</exclude>
95+
</excludes>
96+
</configuration>
97+
</plugin>
98+
</plugins>
99+
</pluginManagement>
80100
<plugins>
81101
<plugin>
82102
<groupId>org.codehaus.mojo</groupId>
@@ -209,5 +229,18 @@
209229
</dependency>
210230
</dependencies>
211231
</profile>
232+
<profile>
233+
<id>volcano</id>
234+
<properties>
235+
<volcano.exclude></volcano.exclude>
236+
</properties>
237+
<dependencies>
238+
<dependency>
239+
<groupId>io.fabric8</groupId>
240+
<artifactId>volcano-client</artifactId>
241+
<version>${kubernetes-client.version}</version>
242+
</dependency>
243+
</dependencies>
244+
</profile>
212245
</profiles>
213246
</project>

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ class KubernetesSuite extends SparkFunSuite
181181
}
182182
}
183183

184-
before {
184+
protected def setUpTest(): Unit = {
185185
appLocator = UUID.randomUUID().toString.replaceAll("-", "")
186186
driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "")
187187
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
@@ -195,6 +195,10 @@ class KubernetesSuite extends SparkFunSuite
195195
}
196196
}
197197

198+
before {
199+
setUpTest()
200+
}
201+
198202
after {
199203
if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
200204
kubernetesTestComponents.deleteNamespace()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.integrationtest
18+
19+
import org.scalatest.Tag
20+
21+
class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite {
22+
23+
override protected def setUpTest(): Unit = {
24+
super.setUpTest()
25+
sparkAppConf
26+
.set("spark.kubernetes.driver.scheduler.name", "volcano")
27+
.set("spark.kubernetes.executor.scheduler.name", "volcano")
28+
}
29+
}
30+
31+
private[spark] object VolcanoSuite {
32+
val volcanoTag = Tag("volcano")
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.integrationtest
18+
19+
import io.fabric8.kubernetes.api.model.Pod
20+
import io.fabric8.volcano.client.VolcanoClient
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
24+
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
25+
import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
26+
27+
private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
28+
import VolcanoTestsSuite._
29+
30+
protected def checkScheduler(pod: Pod): Unit = {
31+
assert(pod.getSpec.getSchedulerName === "volcano")
32+
}
33+
34+
protected def checkAnnotaion(pod: Pod): Unit = {
35+
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
36+
val annotations = pod.getMetadata.getAnnotations
37+
assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
38+
}
39+
40+
protected def checkPodGroup(pod: Pod): Unit = {
41+
val appId = pod.getMetadata.getLabels.get("spark-app-selector")
42+
val podGroupName = s"$appId-podgroup"
43+
val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
44+
val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
45+
assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
46+
}
47+
48+
test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
49+
sparkAppConf
50+
.set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP)
51+
.set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP)
52+
runSparkPiAndVerifyCompletion(
53+
driverPodChecker = (driverPod: Pod) => {
54+
doBasicDriverPodCheck(driverPod)
55+
checkScheduler(driverPod)
56+
checkAnnotaion(driverPod)
57+
checkPodGroup(driverPod)
58+
},
59+
executorPodChecker = (executorPod: Pod) => {
60+
doBasicExecutorPodCheck(executorPod)
61+
checkScheduler(executorPod)
62+
checkAnnotaion(executorPod)
63+
}
64+
)
65+
}
66+
}
67+
68+
private[spark] object VolcanoTestsSuite extends SparkFunSuite {
69+
val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
70+
}

0 commit comments

Comments
 (0)