diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml
index 4529cd9ba4c2..9edf5efd356b 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -614,7 +614,7 @@ jobs:
export MAVEN_CLI_OPTS="--no-transfer-progress"
export JAVA_VERSION=${{ matrix.java }}
# It uses Maven's 'install' intentionally, see https://github.com/apache/spark/pull/26414.
- ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
+ ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install
rm -rf ~/.m2/repository/org/apache/spark
scala-213:
@@ -660,7 +660,7 @@ jobs:
- name: Build with SBT
run: |
./dev/change-scala-version.sh 2.13
- ./build/sbt -Pyarn -Pmesos -Pkubernetes -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile
+ ./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile test:compile
tpcds-1g:
needs: [configure-jobs, precondition]
diff --git a/dev/scalastyle b/dev/scalastyle
index 212ef900eb9b..5f958b8fb0a7 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -17,7 +17,7 @@
# limitations under the License.
#
-SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"}
+SPARK_PROFILES=${1:-"-Pmesos -Pkubernetes -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive -Pvolcano"}
# NOTE: echo "q" is needed because SBT prompts the user for input on encountering a build file
# with failure (either resolution or compilation); the "q" makes SBT quit.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 3d3a65f3d233..1b8b258af277 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -421,6 +421,11 @@ object SparkBuild extends PomBuild {
// SPARK-14738 - Remove docker tests from main Spark build
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
+ if (!profiles.contains("volcano")) {
+ enable(Volcano.settings)(kubernetes)
+ enable(Volcano.settings)(kubernetesIntegrationTests)
+ }
+
enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)
enable(YARN.settings)(yarn)
@@ -956,6 +961,13 @@ object SparkR {
)
}
+object Volcano {
+ // Exclude all volcano file for Compile and Test
+ lazy val settings = Seq(
+ unmanagedSources / excludeFilter := HiddenFileFilter || "*Volcano*.scala"
+ )
+}
+
object Unidoc {
import BuildCommons._
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 0cb5e115906a..6eb357ef2490 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -29,8 +29,25 @@
Spark Project Kubernetes
kubernetes
+ **/*Volcano*.scala
+
+
+ volcano
+
+
+
+
+
+ io.fabric8
+ volcano-model-v1beta1
+ ${kubernetes-client.version}
+
+
+
+
+
org.apache.spark
@@ -103,6 +120,19 @@
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ ${volcano.exclude}
+
+
+
+
+
target/scala-${scala.binary.version}/classes
target/scala-${scala.binary.version}/test-classes
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
new file mode 100644
index 000000000000..1c936848db67
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStep.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.volcano.scheduling.v1beta1.PodGroupBuilder
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, KubernetesExecutorConf, SparkPod}
+
+private[spark] class VolcanoFeatureStep extends KubernetesDriverCustomFeatureConfigStep
+ with KubernetesExecutorCustomFeatureConfigStep {
+
+ private var kubernetesConf: KubernetesConf = _
+
+ private val POD_GROUP_ANNOTATION = "scheduling.k8s.io/group-name"
+
+ private lazy val podGroupName = s"${kubernetesConf.appId}-podgroup"
+ private lazy val namespace = kubernetesConf.namespace
+
+ override def init(config: KubernetesDriverConf): Unit = {
+ kubernetesConf = config
+ }
+
+ override def init(config: KubernetesExecutorConf): Unit = {
+ kubernetesConf = config
+ }
+
+ override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = {
+ val podGroup = new PodGroupBuilder()
+ .editOrNewMetadata()
+ .withName(podGroupName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .build()
+ Seq(podGroup)
+ }
+
+ override def configurePod(pod: SparkPod): SparkPod = {
+ val k8sPodBuilder = new PodBuilder(pod.pod)
+ .editMetadata()
+ .addToAnnotations(POD_GROUP_ANNOTATION, podGroupName)
+ .endMetadata()
+ val k8sPod = k8sPodBuilder.build()
+ SparkPod(k8sPod, pod.container)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
new file mode 100644
index 000000000000..cf337f99cab9
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/VolcanoFeatureStepSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.volcano.scheduling.v1beta1.PodGroup
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+
+class VolcanoFeatureStepSuite extends SparkFunSuite {
+
+ test("SPARK-36061: Driver Pod with Volcano PodGroup") {
+ val sparkConf = new SparkConf()
+ val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf)
+ val step = new VolcanoFeatureStep()
+ step.init(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ val annotations = configuredPod.pod.getMetadata.getAnnotations
+
+ assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
+ val podGroup = step.getAdditionalPreKubernetesResources().head.asInstanceOf[PodGroup]
+ assert(podGroup.getMetadata.getName === s"${kubernetesConf.appId}-podgroup")
+ }
+
+ test("SPARK-36061: Executor Pod with Volcano PodGroup") {
+ val sparkConf = new SparkConf()
+ val kubernetesConf = KubernetesTestConf.createExecutorConf(sparkConf)
+ val step = new VolcanoFeatureStep()
+ step.init(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+ val annotations = configuredPod.pod.getMetadata.getAnnotations
+ assert(annotations.get("scheduling.k8s.io/group-name") === s"${kubernetesConf.appId}-podgroup")
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index a44cedb9e1e2..0bc8508cbf86 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -47,6 +47,7 @@
+ **/*Volcano*.scala
jar
Spark Project Kubernetes Integration Tests
@@ -74,9 +75,28 @@
spark-tags_${scala.binary.version}
test-jar
+
+ org.apache.spark
+ spark-kubernetes_${scala.binary.version}
+ ${project.version}
+ test
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ ${volcano.exclude}
+
+
+
+
+
org.codehaus.mojo
@@ -209,5 +229,18 @@
+
+ volcano
+
+
+
+
+
+ io.fabric8
+ volcano-client
+ ${kubernetes-client.version}
+
+
+
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index c1237e3eb9df..69b736951301 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -181,7 +181,7 @@ class KubernetesSuite extends SparkFunSuite
}
}
- before {
+ protected def setUpTest(): Unit = {
appLocator = UUID.randomUUID().toString.replaceAll("-", "")
driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "")
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
@@ -195,6 +195,10 @@ class KubernetesSuite extends SparkFunSuite
}
}
+ before {
+ setUpTest()
+ }
+
after {
if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
kubernetesTestComponents.deleteNamespace()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala
new file mode 100644
index 000000000000..ed7371718f9a
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import org.scalatest.Tag
+
+class VolcanoSuite extends KubernetesSuite with VolcanoTestsSuite {
+
+ override protected def setUpTest(): Unit = {
+ super.setUpTest()
+ sparkAppConf
+ .set("spark.kubernetes.driver.scheduler.name", "volcano")
+ .set("spark.kubernetes.executor.scheduler.name", "volcano")
+ }
+}
+
+private[spark] object VolcanoSuite {
+ val volcanoTag = Tag("volcano")
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
new file mode 100644
index 000000000000..377a1b816798
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.volcano.client.VolcanoClient
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
+import org.apache.spark.deploy.k8s.integrationtest.VolcanoSuite.volcanoTag
+
+private[spark] trait VolcanoTestsSuite { k8sSuite: KubernetesSuite =>
+ import VolcanoTestsSuite._
+
+ protected def checkScheduler(pod: Pod): Unit = {
+ assert(pod.getSpec.getSchedulerName === "volcano")
+ }
+
+ protected def checkAnnotaion(pod: Pod): Unit = {
+ val appId = pod.getMetadata.getLabels.get("spark-app-selector")
+ val annotations = pod.getMetadata.getAnnotations
+ assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup")
+ }
+
+ protected def checkPodGroup(pod: Pod): Unit = {
+ val appId = pod.getMetadata.getLabels.get("spark-app-selector")
+ val podGroupName = s"$appId-podgroup"
+ val volcanoClient = kubernetesTestComponents.kubernetesClient.adapt(classOf[VolcanoClient])
+ val podGroup = volcanoClient.podGroups().withName(podGroupName).get()
+ assert(podGroup.getMetadata.getOwnerReferences.get(0).getName === pod.getMetadata.getName)
+ }
+
+ test("Run SparkPi with volcano scheduler", k8sTestTag, volcanoTag) {
+ sparkAppConf
+ .set("spark.kubernetes.driver.pod.featureSteps", VOLCANO_FEATURE_STEP)
+ .set("spark.kubernetes.executor.pod.featureSteps", VOLCANO_FEATURE_STEP)
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkScheduler(driverPod)
+ checkAnnotaion(driverPod)
+ checkPodGroup(driverPod)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ checkScheduler(executorPod)
+ checkAnnotaion(executorPod)
+ }
+ )
+ }
+}
+
+private[spark] object VolcanoTestsSuite extends SparkFunSuite {
+ val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
+}